转载

Flink 数据的发送

Flink 数据的发送是通过 Collector 的collector 方法

public interface Collector<T>{
 
 /**
 * Emits a record.
 * 
 * @param record The record to collect.
 */
 void collect(T record);
 
 /**
 * Closes the collector. If any data was buffered, that data will be flushed.
 */
 void close();
}

其中 Output 拓展了 Collector

public interface Output<T> extends Collector<T>{

 /**
 * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
 * operators.
 *
 * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
 * timestamp will be emitted in the future.
 */
 void emitWatermark(Watermark mark);

 /**
 * Emits a record the side output identified by the given {@link OutputTag}.
 *
 * @param record The record to collect.
 */
 <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);

 void emitLatencyMarker(LatencyMarker latencyMarker);
}

实现Output 的类有 RecordWriterOutput

public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>{
 .....
 private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

 private SerializationDelegate<StreamElement> serializationDelegate;

 private final StreamStatusProvider streamStatusProvider;

 private final OutputTag outputTag;
 
 
 @Override
 public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record){
 if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
 // we are only responsible for emitting to the side-output specified by our
 // OutputTag.
 return;
 }

 pushToRecordWriter(record);
 }
 
 
 private <X> void pushToRecordWriter(StreamRecord<X> record){
 serializationDelegate.setInstance(record);

 try {
 recordWriter.emit(serializationDelegate);
 }
 catch (Exception e) {
 throw new RuntimeException(e.getMessage(), e);
 }
 }

}

StreamRecordWriter

public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T>{

 @Override
 public void emit(T record) throws IOException, InterruptedException{
 checkErroneous();
 super.emit(record);
 if (flushAlways) {
 flush();
 }
 }

}

RecordWriter

public class RecordWriter<T extends IOReadableWritable>{

 protected final ResultPartitionWriter targetPartition; // 用于真正写入到没有个partition

 private final ChannelSelector<T> channelSelector; //用于选着发送到哪一个channel


 /** {@link RecordSerializer} per outgoing channel */
 private final RecordSerializer<T>[] serializers; //每一个channel 的序列
}




public void emit(T record) throws IOException, InterruptedException{
 for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
 sendToTarget(record, targetChannel);
 }
 }
 
 private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException{
 RecordSerializer<T> serializer = serializers[targetChannel];

 synchronized (serializer) { //每一个channel serializer不能并发
 SerializationResult result = serializer.addRecord(record);

 while (result.isFullBuffer()) { //buffer,底层使用memorySegment 已满
 Buffer buffer = serializer.getCurrentBuffer(); //将其取出

 if (buffer != null) {
 numBytesOut.inc(buffer.getSizeUnsafe());
 writeAndClearBuffer(buffer, targetChannel, serializer);

 // If this was a full record, we are done. Not breaking
 // out of the loop at this point will lead to another
 // buffer request before breaking out (that would not be
 // a problem per se, but it can lead to stalls in the
 // pipeline).
 if (result.isFullRecord()) {
 break;
 }
 } else {
 buffer = targetPartition.getBufferProvider().requestBufferBlocking();
 result = serializer.setNextBuffer(buffer);
 }
 }
 }

 
 private void writeAndClearBuffer(
 Buffer buffer,
 int targetChannel,
 RecordSerializer<T> serializer) throws IOException {

 try {
 targetPartition.writeBuffer(buffer, targetChannel);
 }
 finally {
 serializer.clearCurrentBuffer();
 }
 }
}

writeAndClearBuffer 真正写入是通过ResultPartitionWriter 的writeBuffer

public class ResultPartitionWriter implements EventListener<TaskEvent>{
 private final ResultPartition partition; //分区的写入

 
 public void writeBuffer(Buffer buffer, int targetChannel) throws IOException{
 partition.add(buffer, targetChannel);
 }


}

ResultPartition 直接将buffer add

public class ResultPartition implements BufferPoolOwner{

 private final ResultSubpartition[] subpartitions; //对buffer 的add ,具体分为 pipeline or Spillable ,这是两种不同的机制 A pipelined in-memory only subpartition, which can be consumed once. A spillable sub partition starts out in-memory and spills to disk if asked to do so. 就是一种在内存里面,一种是刷新到磁盘上面,内存是仅仅只能获取一次


 public void add(Buffer buffer, int subpartitionIndex) throws IOException{
 boolean success = false;

 try {
 checkInProduceState();

 final ResultSubpartition subpartition = subpartitions[subpartitionIndex];

 synchronized (subpartition) {
 success = subpartition.add(buffer);

 // Update statistics
 totalNumberOfBuffers++;
 totalNumberOfBytes += buffer.getSize();
 }
 }
 finally {
 if (success) {
 notifyPipelinedConsumers();
 }
 else {
 buffer.recycle();
 }
 }
 }


}

ResultSubpartition 的add 时候就是将buffer 放入到一个queue 里面去同时notify reader 进行读取如果对于SpillableSubpartition则会刷入到磁盘,然后在返回

原文  http://happyer.github.io/2018/08/08/2018-08-08-flink-output/
正文到此结束
Loading...