Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。既然是对流式数据进行处理,那么就要面临数据在流动计算时,上下游数据通信以及数据处理速度不一致所带来的问题。 本文先从「生产者-消费者模式」的角度介绍了Flink中的数据传输,从而引出了「反压」的概念。接着介绍了Flink在V1.5前「基于TCP的反压机制」以及V1.5后「基于Credit的反压机制」分别如何实现网络流控。最后针对一个反压案例进行分析,介绍了如何进行反压定位和资源调优,并展示了调优结果。 希望在阅读完本文后,读者可以深入理解Flink节点反压的概念以及背后的原理,在遇到反压场景时,能够快速定位瓶颈点,并拥有一套基本的调优思路。
Flink作业在运行状态时,数据会在各个TaskManager(TM)之间流动交换,上游TM到下游TM的数据传输,可以简单看作是生产者&消费者模式。 下面将会介绍 Producer 和 Consumer 在吞吐率不同时,导致的普遍性问题。
假设 Producer的吞吐率为 2 MB/s,Consumer是 1 MB/s ,此时上游产生数据的速度 大于 下游处理数据的速度,且假设两端都存在Buffer,用来暂时存放数据,再假设底层网络传输速度为 2 MB/s。 若Buffer有界,经过5s后,Consumer 端的 Receive Buffer 会被打满,后面新到达的数据就只能被丢弃掉;但在实际场景中,通常生产者在发送数据前会检查 buffer 的可用状态,若 buffer 处于不可用状态,则不会发送新的数据。 面对上述问题,需要有一种动态反馈的机制,根据数据实时传输的情况,动态调整数据的发送速率和接收速率,从而更好的进行网络传输。 动态反馈可以分为以下两种:
通过上小节的介绍,我们了解到,当上游生产数据和下游消费数据速率不一致时,会导致一些问题,这时候需要一种「动态反馈」机制,下面引入「反压」的概念
「反压」是流式系统中关于数据处理能力的动态反馈机制,并且是从下游到上游的反馈,一般发生在实时数据处理的过程中,上游节点的生产速度大于下游节点的消费速度的情况下。
下面将会介绍在Flink中,TaskManager之间如何传输数据,看看 Flink 中数据传输的生产者-消费者模式的具体形式。
下图来源于Apache Flink (http://flink.apache.org) 图中相关概念: ResultPartition(RP) ResultSubPartition(RS) InputChannel(IC) InputGate(IG)
MapDriver 将数据传递给 RecordWriter,之后经由 ChannelSelector 做数据分发,将数据传递到一个或多个 RecordSerializer 做序列化操作,转换为二进制流。ChannelSelector 的分发有两种模式,一种是广播模式,会将数据发送到每个序列化器进行处理,另一种是按某种逻辑进行选择,比如计算数据的hash,然后路由到命中的序列化器。
/**
* The {@link ChannelSelector} determines to which logical channels a record should be written to.
*
* @param <T> the type of record which is sent through the attached output gate
*/
public interface ChannelSelector<T extends IOReadableWritable> {
/**
* Initializes the channel selector with the number of output channels.
*
* @param numberOfChannels the total number of output channels which are attached to respective
* output gate.
*/
void setup(int numberOfChannels);
/**
* Returns the logical channel index, to which the given record should be written. It is illegal
* to call this method for broadcast channel selectors and this method can remain not
* implemented in that case (for example by throwing {@link UnsupportedOperationException}).
*
* 选择模式
*
* @param record the record to determine the output channels for.
* @return an integer number which indicates the index of the output channel through which the
* record shall be forwarded.
*/
int selectChannel(T record);
/**
* Returns whether the channel selector always selects all the output channels.
* 广播模式
*
* @return true if the selector is for broadcast mode.
*/
boolean isBroadcast();
}
public abstract class ResultPartition implements ResultPartitionWriter {
protected final ResultPartitionID partitionId;
/** 该分区的类型,定义要使用的具体子分区实现 */
protected final ResultPartitionType partitionType;
protected final ResultPartitionManager partitionManager;
/** Subpartition 的个数 */
protected final int numSubpartitions;
// - Runtime state --------------------------------------------------------
/** ResultPartition 中的缓冲区 */
protected BufferPool bufferPool;
}
/**
* An input channel consumes a single {@link ResultSubpartitionView}.
*
* <p>For each channel, the consumption life cycle is as follows:
*
* <ol>
* <li>{@link #requestSubpartition()}
* <li>{@link #getNextBuffer()}
* <li>{@link #releaseAllResources()}
* </ol>
*/
public abstract class InputChannel {
/** 输入通道的信息,以便在任务中全局识别它. */
protected final InputChannelInfo channelInfo;
/** 此通道消费接收的RP编号. */
protected final ResultPartitionID partitionId;
/** 此通道使用的子分区的索引. */
protected final int consumedSubpartitionIndex;
protected final SingleInputGate inputGate;
}
其实这是典型的生产者-消费者模式,上游生产数据到 ResultPartition(由ResultSubpartition构成) 中,下游通过 InputGate (由InputChannel构成)消费数据。不同的 task 可能在同一个 TaskManager 中运行,此时这些task可以看做是同一个 TaskManager进程中的不同线程,可以在本地进行数据交换;不同的 task 也可能在不同的 TaskManger 中运行,此时就要通过TaskManager 间的网络通信进行数据交换。
前面介绍了 Flink 基于生产者-消费者模式的数据传输方式,且我们了解到,流式系统在处理数据时,如果上下游处理速度不一致,会出现数据堵塞等问题。这时候需要一种动态反馈的机制,根据数据实时传输的情况,动态调整数据的发送速率和接受速率,从而更好的进行网络传输,即「网络流控」。 本章将会介绍 Flink 在V1.5前后进行网络流控的两种方式:
我们先来看看Flink在V1.5前是如何做动态反馈,进而实现网络流控的。 基于TCP的反压机制底层依赖于「TCP的滑动窗口算法」,本章不会赘述,而会重点描述反压现象的传递过程。
由上图可见,每个TaskManager中都会有个被内部所有task共享的 Network Buffer Pool,它从堆外内存申请内存资源,之后可以为每个 ResultSubpartition 创建 Local Buffer Pool。 假设生产者的速率是 2 MB/S,消费者的速率是 1 MB/S。下面会描述,由于速度不匹配,各层buffer被打满,从而引起反压的过程。
一段时间后,会达到下图的状态,此时 InputChannel 暂时被打满,需要向 Local Buffer Pool 申请新的 buffer,此时 Local Buffer Pool 里的一个 buffer 被标记为 Used。
由上下游处理速率不一致,一段时间过后,InputChannel 将 Local Buffer Pool 的内存申请完了,此时 Local Buffer Pool 的所有 buffer 都被标记为 Used,但还可以向 Network Buffer Pool 继续申请 buffer。
渐渐 Network Buffer Pool 也没有可用 buffer 了,全都变成了 Used,此时消费者无法再读取数据了,Netty也不会接收Socket的数据了。
当消费者的 socket 被用尽,此时会将 windows=0 发送给生产者的发送端(TCP滑动窗口),此时socket会停止发送数据。
不久socket buffer用尽,Netty检测到后会停止向socket发送数据,之后由于 RecordWriter 还在发送数据,这些数据会堆积在Netty Buffer中,到一定程度后,Netty会变为不可写状态,ResultSubpartition 发送数据前都会检测 Netty是否可写,此时 ResultSubpartition 会停止向 Netty 中写数据。
ResultSubpartition 的空间很快被用尽,直到 Local Buffer Pool 和 Network Buffer Pool 的 buffer都被打满后,RecordWriter 就会停止写数据,至此,完成了跨TaskManager的反压。
当一个 Task 的缓冲池用尽之后,网络连接就处于阻塞状态,上游 Task 无法产出数据,下游 Task 无法接收数据,也就是我们所说的「反压」状态。 但是基于TCP的反压机制有以下问题:
为了解决上述问题,Flink 1.5 重构了网络栈,引入了“基于信用值的流量控制算法”(Credit-based Flow Control),即在Flink层实现网络流控,缩短反压链路,且确保 TaskManager 之间的网络连接始终不会处于阻塞状态。 Credit-based Flow Control 的思路其实很简单,它在接收端和发送端之间建立一种类似“信用评级”的机制,发送端向接收端发送的数据永远不会超过接收端的信用值的大小。对于 Flink来说,信用值就是接收端TaskManager 可用的 Buffer 的数量,这样就可以保证发送端 TaskManager 不会向 TCP 连接中发送超出接收端缓冲区可用容量的数据。 基于Credit 实现流量控制 的具体机制为:
基于Credit算法的反压机制,解决了两个问题:
本文首先介绍了Flink中跨TaskManager的数据传输,引出了「生产者-消费者模式」在吞吐率不同时,导致的普遍性问题,以及「动态反馈」机制的必要性,并明确了「反压」的概念,「反压」是流式系统中关于处理能力的动态反馈机制,并且是从下游到上游的反馈。 接着介绍了Flink的网络流控机制,Flink在V1.5前,「基于TCP的滑动窗口机制」实现反压,但是存在单个Task反压会导致整个TaskManager共享的Socket不可用,而且反压链路较长,动态反馈机制较为迟钝等缺点。Flink在V1.5后,采用「基于Credit算法的反压机制」,在ResultPartition层实现反压,提高了反压效率。
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks https://www.ververica.com/blog/how-flink-handles-backpressure https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn