前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中的数据抽象&交换&Credit&背压问题详解

Flink中的数据抽象&交换&Credit&背压问题详解

作者头像
王知无-import_bigdata
发布2023-04-07 18:59:44
6720
发布2023-04-07 18:59:44
举报

一、数据流转——Flink的数据抽象及数据交换过程

本部分讲一下flink底层是如何定义和在操作符之间传递数据的。

其中,第一个构造函数的checkBufferAndGetAddress()方法能够得到direct buffer的内存地址,因此可以操作堆外内存。

1.2 ByteBuffer与NetworkBufferPool
代码语言:javascript
复制
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private volatile int refCnt = 1;
    
    ......
}
代码语言:javascript
复制
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

        ......
        
        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }

        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
                availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
            }
        }

        ......
        
        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb, availableMemorySegments.size(), segmentSize);
    }
代码语言:javascript
复制
  private void redistributeBuffers() throws IOException {
        assert Thread.holdsLock(factoryLock);

        // All buffers, which are not among the required ones
        final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

        if (numAvailableMemorySegment == 0) {
            // in this case, we need to redistribute buffers so that every pool gets its minimum
            for (LocalBufferPool bufferPool : allBufferPools) {
                bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
            }
            return;
        }

        long totalCapacity = 0; // long to avoid int overflow

        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();
            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
        }

        // no capacity to receive additional buffers?
        if (totalCapacity == 0) {
            return; // necessary to avoid div by zero when nothing to re-distribute
        }

        final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
                Math.min(numAvailableMemorySegment, totalCapacity));

        long totalPartsUsed = 0; // of totalCapacity
        int numDistributedMemorySegment = 0;
        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();

            // shortcut
            if (excessMax == 0) {
                continue;
            }

            totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);


            final int mySize = MathUtils.checkedDownCast(
                    memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

            numDistributedMemorySegment += mySize;
            bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
        }

        assert (totalPartsUsed == totalCapacity);
        assert (numDistributedMemorySegment == memorySegmentsToDistribute);
    }
代码语言:javascript
复制
 /** 该内存池需要的最少内存片数目*/
    private final int numberOfRequiredMemorySegments;

    /**
     * 当前已经获得的内存片中,还没有写入数据的空白内存片
     */
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    /**
     * 注册的所有监控buffer可用性的监听器
     */
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    /** 能给内存池分配的最大分片数*/
    private final int maxNumberOfMemorySegments;

    /** 当前内存池大小 */
    private int currentPoolSize;

    /**
     * 所有经由NetworkBufferPool分配的,被本内存池引用到的(非直接获得的)分片数
     */
    private int numberOfRequestedMemorySegments;
代码语言:javascript
复制
 public void setNumBuffers(int numBuffers) throws IOException {
        synchronized (availableMemorySegments) {
            checkArgument(numBuffers >= numberOfRequiredMemorySegments,
                    "Buffer pool needs at least %s buffers, but tried to set to %s",
                    numberOfRequiredMemorySegments, numBuffers);

            if (numBuffers > maxNumberOfMemorySegments) {
                currentPoolSize = maxNumberOfMemorySegments;
            } else {
                currentPoolSize = numBuffers;
            }

            returnExcessMemorySegments();

            // If there is a registered owner and we have still requested more buffers than our
            // size, trigger a recycle via the owner.
            if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
                owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
            }
        }
    }
代码语言:javascript
复制
 private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");
        
        
        
        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }

二、数据流转过程

上段讲了各层数据的抽象,这段讲讲数据在各个task之间exchange的过程。

1. 整体过程

看这张图:

代码语言:javascript
复制
//RecordWriterOutput.java
    @Override
    public void collect(StreamRecord<OUT> record) {
        if (this.outputTag != null) {
            // we are only responsible for emitting to the main input
            return;
        }
        //这里可以看到把记录交给了recordwriter
        pushToRecordWriter(record);
    }

然后recordwriter把数据发送到对应的通道。

代码语言:javascript
复制
//RecordWriter.java
    public void emit(T record) throws IOException, InterruptedException {
        //channelselector登场了
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
    
        private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        
        //选择序列化器并序列化数据
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            //写入channel
            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }

接下来是把数据推给底层设施(netty)的过程:

代码语言:javascript
复制
//ResultPartition.java
    @Override
    public void flushAll() {
        for (ResultSubpartition subpartition : subpartitions) {
            subpartition.flush();
        }
    }
    
        //PartitionRequestQueue.java
        void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
        //这里交给了netty server线程去推
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                ctx.pipeline().fireUserEventTriggered(reader);
            }
        });
    }

netty相关的部分:

代码语言:javascript
复制
//AbstractChannelHandlerContext.java
    public ChannelHandlerContext fireUserEventTriggered(final Object event) {
        if (event == null) {
            throw new NullPointerException("event");
        } else {
            final AbstractChannelHandlerContext next = this.findContextInbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeUserEventTriggered(event);
            } else {
                executor.execute(new OneTimeTask() {
                    public void run() {
                        next.invokeUserEventTriggered(event);
                    }
                });
            }

            return this;
        }
    }

最后真实的写入:

代码语言:javascript
复制
 //PartittionRequesetQueue.java
    private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
        if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
            return;
        }
        // Queue an available reader for consumption. If the queue is empty,
        // we try trigger the actual write. Otherwise this will be handled by
        // the writeAndFlushNextMessageIfPossible calls.
        boolean triggerWrite = availableReaders.isEmpty();
        registerAvailableReader(reader);

        if (triggerWrite) {
            writeAndFlushNextMessageIfPossible(ctx.channel());
        }
    }
    
    private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
        
        ......

                next = reader.getNextBuffer();
                if (next == null) {
                    if (!reader.isReleased()) {
                        continue;
                    }
                    markAsReleased(reader.getReceiverId());

                    Throwable cause = reader.getFailureCause();
                    if (cause != null) {
                        ErrorResponse msg = new ErrorResponse(
                            new ProducerFailedException(cause),
                            reader.getReceiverId());

                        ctx.writeAndFlush(msg);
                    }
                } else {
                    // This channel was now removed from the available reader queue.
                    // We re-add it into the queue if it is still available
                    if (next.moreAvailable()) {
                        registerAvailableReader(reader);
                    }

                    BufferResponse msg = new BufferResponse(
                        next.buffer(),
                        reader.getSequenceNumber(),
                        reader.getReceiverId(),
                        next.buffersInBacklog());

                    if (isEndOfPartitionEvent(next.buffer())) {
                        reader.notifySubpartitionConsumed();
                        reader.releaseAllResources();

                        markAsReleased(reader.getReceiverId());
                    }

                    // Write and flush and wait until this is done before
                    // trying to continue with the next buffer.
                    channel.writeAndFlush(msg).addListener(writeListener);

                    return;
                }
        
        ......
        
    }
代码语言:javascript
复制
  //CreditBasedPartitionRequestClientHandler.java
    private void decodeMsg(Object msg) throws Throwable {
        final Class<?> msgClazz = msg.getClass();

        // ---- Buffer --------------------------------------------------------
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

            RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();

                cancelRequestFor(bufferOrEvent.receiverId);

                return;
            }

            decodeBufferOrEvent(inputChannel, bufferOrEvent);

        } 
        
        ......
        
    }
代码语言:javascript
复制
//StreamElementSerializer.java
    public StreamElement deserialize(DataInputView source) throws IOException {
        int tag = source.readByte();
        if (tag == TAG_REC_WITH_TIMESTAMP) {
            long timestamp = source.readLong();
            return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
        }
        else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
            return new StreamRecord<T>(typeSerializer.deserialize(source));
        }
        else if (tag == TAG_WATERMARK) {
            return new Watermark(source.readLong());
        }
        else if (tag == TAG_STREAM_STATUS) {
            return new StreamStatus(source.readInt());
        }
        else if (tag == TAG_LATENCY_MARKER) {
            return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
        }
        else {
            throw new IOException("Corrupt stream, found tag: " + tag);
        }
    }

然后再次在StreamInputProcessor.processInput()循环中得到处理。

至此,数据在跨jvm的节点之间的流转过程就讲完了。

三、Credit漫谈

1. 背压问题

那么Flink又是如何处理背压的呢?答案也是靠这些缓冲池。

这张图说明了Flink在生产和消费数据时的大致情况。ResultPartition和InputGate在输出和输入数据时,都要向NetworkBufferPool申请一块MemorySegment作为缓存池。

基于Credit的流控就是这样一种建立在信用(消费数据的能力)上的,面向每个虚链路(而非端到端的)流模型,如下图所示:

首先,下游会向上游发送一条credit message,用以通知其目前的信用(可联想信用卡的可用额度),然后上游会根据这个信用消息来决定向下游发送多少数据。当上游把数据发送给下游时,它就从下游的信用卡上划走相应的额度(credit balance):

如上图所示,a是面向连接的流设计,b是端到端的流设计。其中,a的设计使得当下游节点3因某些情况必须缓存数据暂缓处理时,每个上游节点(1和2)都可以利用其缓存保存数据;而端到端的设计b里,只有节点3的缓存才可以用于保存数据(读者可以从如何实现上想想为什么)。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-04-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.2 ByteBuffer与NetworkBufferPool
  • 二、数据流转过程
  • 1. 整体过程
  • 三、Credit漫谈
    • 1. 背压问题
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档