首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink原理 | Flink中的数据抽象及数据交换过程

Flink原理 | Flink中的数据抽象及数据交换过程

作者头像
王知无-import_bigdata
发布2019-12-05 15:18:42
2K1
发布2019-12-05 15:18:42
举报

By 大数据技术与架构

场景描述:Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。

关键词:数据抽象 内存管理

Flink的数据抽象
MemorySegment

Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。

这个MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。

如果说byte[]数组和direct memory是最底层的存储,那么memorysegment就是在其上覆盖的一层统一抽象。它定义了一系列抽象方法,用于控制和底层内存的交互,如:

public abstract class MemorySegment {

    public abstract byte get(int index);
    
    public abstract void put(int index, byte b);
    
    public int size() ;
    
    public abstract ByteBuffer wrap(int offset, int length);
    
    ......
}

我们可以看到,它在提供了诸多直接操作内存的方法外,还提供了一个wrap()方法,将自己包装成一个ByteBuffer,我们待会儿讲这个ByteBuffer。

Flink为MemorySegment提供了两个实现类:HeapMemorySegment和HybridMemorySegment。他们的区别在于前者只能分配堆内存,而后者能用来分配堆内和堆外内存。事实上,Flink框架里,只使用了后者。这是为什么呢?

如果HybridMemorySegment只能用于分配堆外内存的话,似乎更合常理。但是在JVM的世界中,如果一个方法是一个虚方法,那么每次调用时,JVM都要花时间去确定调用的到底是哪个子类实现的该虚方法(方法重写机制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而如果该方法虽然是个虚方法,但实际上整个JVM里只有一个实现(就是说只加载了一个子类进来),那么JVM会很聪明的把它去虚化处理,这样就不用每次调用方法时去找方法表了,能够大大提升性能。但是只分配堆内或者堆外内存不能满足我们的需要,所以就出现了HybridMemorySegment同时可以分配两种内存的设计。

我们可以看看HybridMemorySegment的构造代码:

HybridMemorySegment(ByteBuffer buffer, Object owner) {
        super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
        this.offHeapBuffer = buffer;
    }
    
        HybridMemorySegment(byte[] buffer, Object owner) {
        super(buffer, owner);
        this.offHeapBuffer = null;
    }

其中,第一个构造函数的checkBufferAndGetAddress()方法能够得到direct buffer的内存地址,因此可以操作堆外内存。

ByteBuffer与NetworkBufferPool

在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。

注意,这个Buffer是个flink接口,不是java.nio提供的那个Buffer抽象类。Flink在这一层面同时使用了这两个同名概念,用来存储对象,直接看代码时到处都是各种xxxBuffer很容易混淆:

  • java提供的那个Buffer抽象类在这一层主要用于构建HeapByteBuffer,这个主要是当数据从jvm里的一个对象被序列化成字节数组时用的;
  • Flink的这个Buffer接口主要是一种flink层面用于传输数据和事件的统一抽象,其实现类是NetworkBuffer,是对MemorySegment的包装。Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。 因为Buffer的底层是MemorySegment,这可能不是JVM所管理的,所以为了知道什么时候一个Buffer用完了可以回收,Flink引入了引用计数的概念,当确认这个buffer没有人引用,就可以回收这一片MemorySegment用于别的地方了(JVM的垃圾回收为啥不用引用计数?读者思考一下):
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private volatile int refCnt = 1;
    
    ......
}

为了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,并且提供了唯一实现NetworkBufferPool,这是个工厂模式的应用。

NetworkBufferPool在每个TaskManager上只有一个,负责所有子task的内存管理。其实例化时就会尝试获取所有可由它管理的内存(对于堆内存来说,直接获取所有内存并放入老年代,并令用户对象只在新生代存活,可以极大程度的减少Full GC),我们看看其构造方法:

public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

        ......
        
        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }

        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
                availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
            }
        }

        ......
        
        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb, availableMemorySegments.size(), segmentSize);
    }

由于NetworkBufferPool只是个工厂,实际的内存池是LocalBufferPool。每个TaskManager都只有一个NetworkBufferPool工厂,但是上面运行的每个task都要有一个和其他task隔离的LocalBufferPool池,这从逻辑上很好理解。另外,NetworkBufferPool会计算自己所拥有的所有内存分片数,在分配新的内存池时对每个内存池应该占有的内存分片数重分配,步骤是:

  • 首先,从整个工厂管理的内存片中拿出所有的内存池所需要的最少Buffer数目总和 如果正好分配完,就结束
  • 其次,把所有的剩下的没分配的内存片,按照每个LocalBufferPool内存池的剩余想要容量大小进行按比例分配
  • 剩余想要容量大小是这么个东西:如果该内存池至少需要3个buffer,最大需要10个buffer,那么它的剩余想要容量就是7 实现代码如下:
private void redistributeBuffers() throws IOException {
        assert Thread.holdsLock(factoryLock);

        // All buffers, which are not among the required ones
        final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

        if (numAvailableMemorySegment == 0) {
            // in this case, we need to redistribute buffers so that every pool gets its minimum
            for (LocalBufferPool bufferPool : allBufferPools) {
                bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
            }
            return;
        }

        long totalCapacity = 0; // long to avoid int overflow

        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();
            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
        }

        // no capacity to receive additional buffers?
        if (totalCapacity == 0) {
            return; // necessary to avoid div by zero when nothing to re-distribute
        }

        final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
                Math.min(numAvailableMemorySegment, totalCapacity));

        long totalPartsUsed = 0; // of totalCapacity
        int numDistributedMemorySegment = 0;
        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();

            // shortcut
            if (excessMax == 0) {
                continue;
            }

            totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);


            final int mySize = MathUtils.checkedDownCast(
                    memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

            numDistributedMemorySegment += mySize;
            bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
        }

        assert (totalPartsUsed == totalCapacity);
        assert (numDistributedMemorySegment == memorySegmentsToDistribute);
    }

接下来说说这个LocalBufferPool内存池。 LocalBufferPool的逻辑想想无非是增删改查,值得说的是其fields:

/** 该内存池需要的最少内存片数目*/
    private final int numberOfRequiredMemorySegments;

    /**
     * 当前已经获得的内存片中,还没有写入数据的空白内存片
     */
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    /**
     * 注册的所有监控buffer可用性的监听器
     */
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    /** 能给内存池分配的最大分片数*/
    private final int maxNumberOfMemorySegments;

    /** 当前内存池大小 */
    private int currentPoolSize;

    /**
     * 所有经由NetworkBufferPool分配的,被本内存池引用到的(非直接获得的)分片数
     */
    private int numberOfRequestedMemorySegments;

承接NetworkBufferPool的重分配方法,我们来看看LocalBufferPool的setNumBuffers()方法,代码很短,逻辑也相当简单,就不展开说了:

public void setNumBuffers(int numBuffers) throws IOException {
        synchronized (availableMemorySegments) {
            checkArgument(numBuffers >= numberOfRequiredMemorySegments,
                    "Buffer pool needs at least %s buffers, but tried to set to %s",
                    numberOfRequiredMemorySegments, numBuffers);

            if (numBuffers > maxNumberOfMemorySegments) {
                currentPoolSize = maxNumberOfMemorySegments;
            } else {
                currentPoolSize = numBuffers;
            }

            returnExcessMemorySegments();

            // If there is a registered owner and we have still requested more buffers than our
            // size, trigger a recycle via the owner.
            if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
                owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
            }
        }
    }
RecordWriter与Record

我们接着往高层抽象走,刚刚提到了最底层内存抽象是MemorySegment,用于数据传输的是Buffer,那么,承上启下对接从Java对象转为Buffer的中间对象是什么呢?是StreamRecord。

从StreamRecord<T>这个类名字就可以看出来,这个类就是个wrap,里面保存了原始的Java对象。另外,StreamRecord还保存了一个timestamp。

那么这个对象是怎么变成LocalBufferPool内存池里的一个大号字节数组的呢?借助了StreamWriter这个类。

我们直接来看把数据序列化交出去的方法:

private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");
        
        
        
        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }

先说最后一行,如果配置为flushAlways,那么会立刻把元素发送出去,但是这样吞吐量会下降;Flink的默认设置其实也不是一个元素一个元素的发送,是单独起了一个线程,每隔固定时间flush一次所有channel,较真起来也算是mini batch了。 再说序列化那一句:SerializationResult result = serializer.addRecord(record);。在这行代码中,Flink把对象调用该对象所属的序列化器序列化为字节数组。

数据流转过程

上一节讲了各层数据的抽象,这一节讲讲数据在各个task之间exchange的过程。

整体过程

看这张图:

1.第一步必然是准备一个ResultPartition; 2.通知JobMaster; 3.JobMaster通知下游节点;如果下游节点尚未部署,则部署之; 4.下游节点向上游请求数据 5.开始传输数据

数据跨task传递

本节讲一下算子之间具体的数据传输过程。也先上一张图:

数据在task之间传递有如下几步:

1.数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。 2.每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer 3.接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment) 4.单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入 5.对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel 6.有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码 数据在不同机器的算子之间传递的步骤就是以上这些。

了解了步骤之后,再来看一下部分关键代码: 首先是把数据交给recordwriter。

//RecordWriterOutput.java
    @Override
    public void collect(StreamRecord<OUT> record) {
        if (this.outputTag != null) {
            // we are only responsible for emitting to the main input
            return;
        }
        //这里可以看到把记录交给了recordwriter
        pushToRecordWriter(record);
    }

然后recordwriter把数据发送到对应的通道。

//RecordWriter.java
    public void emit(T record) throws IOException, InterruptedException {
        //channelselector登场了
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
    
        private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        
        //选择序列化器并序列化数据
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            //写入channel
            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }

接下来是把数据推给底层设施(netty)的过程:

//ResultPartition.java
    @Override
    public void flushAll() {
        for (ResultSubpartition subpartition : subpartitions) {
            subpartition.flush();
        }
    }
    
        //PartitionRequestQueue.java
        void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
        //这里交给了netty server线程去推
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                ctx.pipeline().fireUserEventTriggered(reader);
            }
        });
    }

netty相关的部分:

//AbstractChannelHandlerContext.java
    public ChannelHandlerContext fireUserEventTriggered(final Object event) {
        if (event == null) {
            throw new NullPointerException("event");
        } else {
            final AbstractChannelHandlerContext next = this.findContextInbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeUserEventTriggered(event);
            } else {
                executor.execute(new OneTimeTask() {
                    public void run() {
                        next.invokeUserEventTriggered(event);
                    }
                });
            }

            return this;
        }
    }

最后真实的写入:

//PartittionRequesetQueue.java
    private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
        if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
            return;
        }
        // Queue an available reader for consumption. If the queue is empty,
        // we try trigger the actual write. Otherwise this will be handled by
        // the writeAndFlushNextMessageIfPossible calls.
        boolean triggerWrite = availableReaders.isEmpty();
        registerAvailableReader(reader);

        if (triggerWrite) {
            writeAndFlushNextMessageIfPossible(ctx.channel());
        }
    }
    
    private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
        
        ......

                next = reader.getNextBuffer();
                if (next == null) {
                    if (!reader.isReleased()) {
                        continue;
                    }
                    markAsReleased(reader.getReceiverId());

                    Throwable cause = reader.getFailureCause();
                    if (cause != null) {
                        ErrorResponse msg = new ErrorResponse(
                            new ProducerFailedException(cause),
                            reader.getReceiverId());

                        ctx.writeAndFlush(msg);
                    }
                } else {
                    // This channel was now removed from the available reader queue.
                    // We re-add it into the queue if it is still available
                    if (next.moreAvailable()) {
                        registerAvailableReader(reader);
                    }

                    BufferResponse msg = new BufferResponse(
                        next.buffer(),
                        reader.getSequenceNumber(),
                        reader.getReceiverId(),
                        next.buffersInBacklog());

                    if (isEndOfPartitionEvent(next.buffer())) {
                        reader.notifySubpartitionConsumed();
                        reader.releaseAllResources();

                        markAsReleased(reader.getReceiverId());
                    }

                    // Write and flush and wait until this is done before
                    // trying to continue with the next buffer.
                    channel.writeAndFlush(msg).addListener(writeListener);

                    return;
                }
        
        ......
        
    }

上面这段代码里第二个方法中调用的writeAndFlush(msg)就是真正往netty的nio通道里写入的地方了。在这里,写入的是一个RemoteInputChannel,对应的就是下游节点的InputGate的channels。

有写就有读,nio通道的另一端需要读入buffer,代码如下:

//CreditBasedPartitionRequestClientHandler.java
    private void decodeMsg(Object msg) throws Throwable {
        final Class<?> msgClazz = msg.getClass();

        // ---- Buffer --------------------------------------------------------
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

            RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();

                cancelRequestFor(bufferOrEvent.receiverId);

                return;
            }

            decodeBufferOrEvent(inputChannel, bufferOrEvent);

        }
        
        ......
        
    }

插一句,Flink其实做阻塞和获取数据的方式非常自然,利用了生产者和消费者模型,当获取不到数据时,消费者自然阻塞;当数据被加入队列,消费者被notify。Flink的背压机制也是借此实现。

然后在这里又反序列化成StreamRecord:

//StreamElementSerializer.java
    public StreamElement deserialize(DataInputView source) throws IOException {
        int tag = source.readByte();
        if (tag == TAG_REC_WITH_TIMESTAMP) {
            long timestamp = source.readLong();
            return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
        }
        else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
            return new StreamRecord<T>(typeSerializer.deserialize(source));
        }
        else if (tag == TAG_WATERMARK) {
            return new Watermark(source.readLong());
        }
        else if (tag == TAG_STREAM_STATUS) {
            return new StreamStatus(source.readInt());
        }
        else if (tag == TAG_LATENCY_MARKER) {
            return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
        }
        else {
            throw new IOException("Corrupt stream, found tag: " + tag);
        }
    }

然后再次在StreamInputProcessor.processInput()循环中得到处理。

至此,数据在跨jvm的节点之间的流转过程就讲完了。

来源:jianshu.com/p/eaafc9db3f74

欢迎点赞+收藏+转发朋友圈素质三连

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink的数据抽象
    • MemorySegment
      • ByteBuffer与NetworkBufferPool
        • RecordWriter与Record
        • 数据流转过程
          • 整体过程
            • 数据跨task传递
            相关产品与服务
            文件存储
            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档