前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

作者头像
章鱼carl
发布2022-03-31 11:18:44
5920
发布2022-03-31 11:18:44
举报
文章被收录于专栏:章鱼carl的专栏章鱼carl的专栏

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失

【Flink】第十二篇:记kudu-connector写CDC数据的-D数据时,报主键不存在的异常

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第十七篇:记一次牛轰轰的OOM故障排查

【Flink】第十九篇:从一个批量写HBase性能问题到一个Flink issue的距离

从本篇,笔者会从Flink Client开始,抽丝剥茧,循序渐进分析Flink的源码。受限于个人水平,必然会有很多错误,请私信我,以便对错误之处进行修改或说明。

本文从一个简单的WordCount程序入手,以DAG额绘制逻辑为线索,探索在执行execute提交作业之前的源码主线逻辑。

源码分析容易绕晕,所以,先将结论及分析线索进行一个说明,以期在读者脑海中铺设一个相似的语境,

核心抽象

主要涉及四个核心抽象:

  • DataStream:面向开发者。用户调用DataStream API的算子方法,将业务逻辑封装为Function传入算子。从用户角度,形式上来讲,是对DataStream进行链式调用,每次调用都是一次业务逻辑语义的表达,即将DataStream进行一次转换。这种面相DataStream的转换操作符合用户角度的调用习惯和思维方式,从用户角度来看DAG中的每个节点是一种DataStream。
  • Function:表达业务逻辑。用户面向DataStream表达的原始业务逻辑的封装。
  • Transformation:表达上下游关系,组织成流水线,面向内核。用户调用DataStream API进行数据处理的一系列逻辑,最终会转换为Transformation流水线。从Flink角度来看,Flink面向的DAG中各个节点是Transformation。
  • Operator:关注数据物理来源、序列化、数据转发、容错。Task包含一个或者多个算子,一个算子就是一个计算步骤,具体计算由算子中包含的Function来执行。
  • 关系:Transformation持有Operator,Operator持有Function。每个DataStream包含一个Transformation。调用DataStream API的算子处理流水线,最终会转换为Transformation流水线。

思维导图如下,

下面逐一简单介绍这四个核心抽象,以便于后面的源码分析。

DataStream


子类继承关系:

只有两个成员变量:

每个DataStream都有一个Transformation对象,表示该DataStream从上游DataStream使用该Transformation而来

Function

按照层次划分Function:

由于Function的实现子类很多,就不一一列举了。简单介绍一下各个层次下的Function的特点:

1. 无状态Function用来做无状态计算,使用比较简单。和RichFunction是一一对应的。如MapFunction和RichMapFunction。

2. RichFunction有俩方面增强:

  • 增加了open、close方法管理Function的生命周期。
  • 增加了getRuntimeContext

3. ProccessFunction可以访问三方面的构件块:

  • 事件(数据流元素)
  • 状态(容错和一致性)
  • 定时器(事件时间和处理时间)

Transformatio

数据转换(Transformation)衔接DataStream API和Flink内核。DataStream面向开发者,Transformation面向Flink内核,调用DataStream API的流水线最终会转换为Transformation流水线。Flink Client把Transformation流水线交给Environment,调用execute,在execute进一步将Transformation流水线转换为StreamGraph,接着再转换为JobGraph。

Transformation主要分为两类:

1. 物理Transformation:转换为实际运行的算子(Operator)

  • ReduceTransformation
  • BroadcastStateTransformation
  • SinkTransformation
  • TwoInputTransformation
  • LegacySourceTransformation
  • LegacySinkTransformation
  • TimestampsAndWatermarksTransformation
  • OneInputTransformation
  • AbstractMultipleInputTransformation
  • SourceTransformation

2. 虚拟Transformation:不会转换为实际运行的算子(Operator)

  • PartitionTransformation
  • UnionTransformation
  • SideOutputTransformation
  • FeedbackTransformation
  • CoFeedbackTransformation

Operator


算子(StreamOperator)关注数据物理来源、序列化、数据转发、容错。Task包含一个或者多个算子,一个算子就是一个计算步骤,具体计算由算子中包含的Function来执行。

StreamOperator的直接子类有:

  • MultipleInputStreamOperator
  • TwoInputStreamOperator
  • AbstractStreamOperatorV2
  • OneInputStreamOperator
  • AbstractStreamOperator

源码分析

从一个WordCount开始分析:

代码语言:javascript
复制
public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .socketTextStream("127.0.0.1", 5555)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }

}

以上是一个WordCount Demo:

  1. 从Socket为Source Stream
  2. flatMap切分输入行为若干单词
  3. 按照单词进行keyBy
  4. 以5秒为窗口,进行在处理时间属性上的滚动开窗
  5. 进行sum聚合求和
  6. print输出聚合值到控制台

1. socketTextStream

从socketTextStream方法进入到了Environment中,经过几层简单的socketTextStream重载方法到了addSource方法:

这里将重载过程增加的默认参数,例如,delimiter,maxRetry一起打包生成一个SocketTextStreamFunction实例(Function)。并调用addSource进行添加。

addSource再次经过一系列的对方法参数的富化重载,最终到了最内层的addSource重载:

这个函数里主要逻辑:

  1. 入参检查
  2. 从SourceFunction类型抽取输出类型,这里实例是SocketTextStreamFunction,输出类型抽取的结果是String.class
  3. 由这个Function生成一个Operator实例(StreamSource)
  4. 由Operator实例生成一个DataStream类型的实例(DataStreamSource),并返回

这里貌似看起来和Transformation没啥关系,其实,在生成DataStreamSource的构造函数里我们可以看到端倪:

这里将operator封装成了Transformation,并调用父类构造器,最终在DataStream的构造器里这样进行了成员变量的初始化:

所以,经过以上对socketTextStream的函数调用栈分析,结论是最终返回了一个DataSteam实例,并且实例中持有两个重要的实例:tansformation、environment。tansformation是对SocketTextStreamFunction的封装,environment是用于持有上下文环境。

对于这个阶段的时序图如下,

2. flatMap

接着上面,从Demo中进行第二个链式调用的方法是flatMap,源码中同样是对flatMap进行了几次富参数化的重载,重载过程中同样是对数据类型进行了抽取:

以及对默认的转换名进行了添加,以及对Function封装为Operator(StreamFlatMap)

进入最内层的doTransform,

以上doTransform的主要逻辑如下,

  1. 由上游DataStream持有transformation抽出上游输出的类型,在这里是String.class,即为一行行的socket文本
  2. 由operator、上游transformation和上游输出类型以及并行度生成Transformation实例(OneInputTransformation)
  3. 由生成的Transformation实例和environment实例生成本次转换后的输出流:SingleOutputStreamOperator(DataStream),并最终返回这个DataStream
  4. 由当前DataStream得到持有的environment,将本次的Transformation实例添加到environment

我们来看看这个很重要的addOperator,

其实就是将本次的Transformation添加到一个被environment持有的List里面,

至此,我们总结一下在Demo中的第二个链式调用的操作flatMap里,Flink都做了些什么:由上游的DataStream得出上游的输出类型以及上游调用过的Transformation,再结合本次的Transformation,来生成本次的DataStream,当然同样要将environment给本次的DataStream。

还有一个重要的操作是将本次的Transformation添加到了environment的一个List结构的transformations里。

本次的调用时序图如下,

3. keyBy

接着上游的SingleOutputStreamOperator流,keyBy对齐进行了分组,我们进入keyBy一探究竟。keyBy里面看似很简单,直接返回一个KeyedStream,

我们进入KeyedStream的构造方法,发现是一系列构造函数的重载,调用栈如下,

在重载过程中富化了一系列入参,例如,分区函数StreamPartitioner,又有分区函数StreamPartitioner和用户分装的选择器KeySelector,生成transformation实例:PartitionTransformation,最终调用了父类DataStream的构造器。同样和之前一样也是将transformation和environment传给了构造的DataStream实例。

但是整个过程并没有将transformation添加到transformations的List里,因为keyBy只是一个虚操作,在前面我们已经强调过,PartitionTransformation属于虚拟Transformation,而不是物理Transformation,只有物理Transformation才会转换为真正的执行节点交给Flink去进行绘制DAG。

后面的window、sum、print就不再一一分析了,基本思想已经被以上的三种调用覆盖了:socketTextStream、flatMap、keyBy。

经过一系列的链式调用,最终用户调用execute方法提交作业,这里提交的最重要的数据结构就是List:tansformations。至于execute里面又对tansformations做了些什么,且听下回分解。。。。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档