上图为一个 Flink 作业的提交流程,主要可以分为以下几个步骤:
接下来我们以如下例子来剖析各个步骤具体的执行流程:
public static void main(String[] args) throws Exception {
// 创建 execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 data source
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
// 将每个 ride 转为 Tuple2(driverId, 1)
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) {
return Tuple2.of(ride.driverId, 1L);
}
});
// 对 stream 根据 driverId 进行重新分区
KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
// 计算每个司机的 ride 数
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
// 将结果进行打印
rideCounts.print();
// 开始运行
env.execute("Ride Count");
}
该过程发生在 Client 端
上述代码可以分为三个部分,即:
DataStreamSource
表示,是该拓扑的头结点List<Transformation<?>> transformations
表示DataStreamSink
表示,是该拓扑的尾结点我们分别就这三部分进行说明
add Source 相关的调用如下:
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
如上,通过 DataStreamSource<OUT> StreamExecutionEnvironment#addSource(SourceFunction<OUT> function)
设置 SourceFunction 并返回一个 DataStreamSource。关于 SourceFunction、SourceContext 等请查看
SourceFunction & SourceContext & StreamSource
设置输入源env.addSource(new TaxiRideGenerator())
的流程如下:
总结一下,addSource 的主要流程如下:
以 map 操作(如下)为例,来说明 transformation 具体流程。在这里,map 将 TaxiRide 转换为 (ride.driverId, 1L)
的二元组
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) {
return Tuple2.of(ride.driverId, 1L);
}
});
主要流程如下:
map
之后的 keyBy
、sum
和 map
有类似的过程:
List<Transformation<?>> transformations
中returenDataStream: DataStream
最后的 rideCounts.print()
会触发 DataStream.addSink(...)
,其流程也如上面的几步,只是最终返回的类型是 DataStreamSink,并没有直接继承 DataStream,包含一个 transformation
。我们以下图来看 DataStream 从创建、转换、输出的过程(每个 Transformation 都有 id,从 1 开始,下面 5 个 Transformations 的 id 分别是 1、2、3、4、5):
List<Transformation>
。如上图中红线所示StreamExecutionEnvironment
的 List<Transformation<?>> #transformations
中;但并不需要 DataStream 的每一个转换调用都添加一个 Transformation 到该 list 中,根据上面提到的 input 指向关系,即使不添加不包含 Function 的 Transformation 也不会丢失任何一个 Transformationenv.execute("Ride Count")
会触发真正的执行,其内部主要分为两步:
generate()
方法生成 StreamGraph我们先来看看 StreamGraph 和 StreamGraphGenerator 的一些概况,再来详细看看 StreamGraphGenerator 是如何生成 StreamGraph 的
StreamGraphGenerator#generate() 生成 StreamGraph
如上流程图,关键的地方是看 translator 对 transformation 是如何 translateForStreaming 的,我们来看看 OneInputTransformationTranslator#translateForStreaming
,这也是 map、filter 操作对应的 Translator,如下:
最终,上例中的代码生成的 StreamGraph 可以用下面这个图来表示:
在上图中:
sourceVertex、targetVertex
来说明边是由哪个节点指向哪个节点、StreamPartitioner outputPartitioner
表示 source 节点的数据到 target 节点的是如何分区的对于 StreamGraph 及其生成,以下是一些 QA
Q:为什么上图中没有 id 为 3 的节点 ?
A:有些 transform 操作并不会生成 StreamNode。virtualSideoutputNodes、virtualPartitionNodes 这几类 transform 都会被处理成虚拟节点,当下游生成 StreamNode 后,发现上游为虚拟节点会找到虚拟节点的上游,并创建 StreamEdge 与虚拟节点上游的 transform 进行连接,并把虚拟节点信息写到 StreamEdge 中
Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> virtualPartitionNodes
中Q:StreamGraph 还包含哪些需要注意的东西(除了上述的拓扑关系)?
A:主要有:
StateBackend stateBackend
:表示要使用哪种类型的 StateBackend,是 memory 的还是 rocksdb 的Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts
:job 运行依赖的 jar、文件等信息Set<Integer> sources, Set<Integer> sinks
:表示输入(头结点)、输出(尾节点)的 ids