前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 是如何将你写的代码生成 StreamGraph 的 (上篇)

Flink 是如何将你写的代码生成 StreamGraph 的 (上篇)

作者头像
kk大数据
发布2021-03-13 22:25:13
1.2K0
发布2021-03-13 22:25:13
举报
文章被收录于专栏:kk大数据kk大数据

一、絮叨两句

新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ?

反正我立了,我今年给自己立的 FLAG 是大致阅读大数据几个框架的源码。

为什么要“大致”阅读,因为这些牛逼的框架都是层层封装,搞懂核心原理已经是很不易,更别谈熟读源码了。

但是目标还是要有的,我也不要当一条咸鱼。

之前几篇源码阅读的文章,不知道大家有没有亲自动手打开 Idea 去试一试,这里我再贴一下文章链接,大家可以再回顾一下。

阅读 Flink 源码前必知必会 - SPI 和 ClassLoader

阅读 Flink 源码前必会的知识 - 命令行解析库 Apache Commons Cli

Flink 源码阅读环境准备,并调试 Flink-Clients 模块

Flink Client 实现原理与源码解析(保姆级教学)

本次,我们来聊一聊,我们自己写的代码是如何变成 StreamGraph 的。

二、引出问题

开始之前,不妨稍微回顾一下......

一般我们执行一个 Flink 程序,都是使用命令行 flink run(flink 界面上执行的时候,也是在调用 flink run 命令来执行的)来执行,然后shell 会使用 java 命令,执行到 CliFrontend 类的 main 方法。

main 方法里面,首先会解析用户的输入参数,解析 flink-conf.yml 配置文件,解压出用户 jar 包里的依赖,以及其他的信息,都封装到 PackagedProgram 对象中。然后切换当前线程的类加载器为 UserCodeClassLoader,这个类加载器自定义了一些策略(Child-First 或者 Parent-First),使用这个类加载器去反射执行用户代码的 main 方法。

然后今天的故事就从这里开始。

首先我们贴一段 Flink 自带的 Example 里的代码(稍稍简化了代码,去掉了无关的逻辑):

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);

DataStream<Tuple2<String, Integer>> counts =
    text.flatMap(new Tokenizer())
    .keyBy(value -> value.f0).sum(1);

counts.print();

它是如何变成这张图的:

这张图是一个有向无环图,组成有向无环图的就是顶点信息,以及边的信息。

这些信息被封装在 StreamGraph 类之中,这个类中有三个非常重要的属性:

代码语言:javascript
复制
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;

可以看到这几个属性记录了这个 Graph 中有几个节点,几个是 sources,几个是 sinks。

其中 StreamNode 是对节点的封装,节点上有几个重要的属性如下:

代码语言:javascript
复制
private final String operatorName;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

operatorName 表示节点的名字,inEdges 表示这个节点上游的边,outEdges 表示这个节点下游的边。

然后,StreamEdge 是对边的封装,只有输入节点 id 和目标节点 id:

代码语言:javascript
复制
private final int sourceId;
private final int targetId;

这三个类的这几个属性就描述了刚刚的那张图。

三、记住一个非常重要的属性

它就是 StreamExecutionEnvironment 类的 transformations 属性:

代码语言:javascript
复制
protected final List<Transformation<?>> transformations = new ArrayList<>();

什么是 Transformation,Transformation 就是 Flink 对我们写的算子的额外信息的封装,比如算子的名字,id,输出类型,输入,并行度等等这些信息。

有些算子最终会调用 this.tranformations.add() 加入到列表里来,而有的不会。

四、从 env.fromElements() 开始

env.fromElements(),这是一个算子,这个算子定义了 source 信息,这个算子对应的 transformation 是 LegacySourceTransformation,里面记录了算子的id,名字,输出类型,并行度,有界还是无界等等信息。

最后这个方法返回的是一个 DataStreamSource 对象,这个对象的基类是 DataStream。DataStream 里有一个 transformation 属性。

也就是说 env.fromElements() 返回了一个 DataStream 对象,并且把它自身的 transformation 信息放到这个 DataStream 实例的属性里面了。

env.fromElements 这个算子是没有加入到 上面的 transformations 列表中去的。

五、FlatMap 算子源码分析

紧接着,上面的 env.fromElements 的返回值:DataStream 实例,调用了它自己的 flatMap 方法,flatMap 最终又调用了 doTransform 方法。

FlatMap 算子也是要构造一个 transformation 的,FlatMap 对应的 transformation 是 OneInputTransformation,这个类里有一个属性是 input,也就是 FlatMap 算子的输入信息。我们看一下它的构造方法

代码语言:javascript
复制
public OneInputTransformation(
      Transformation<IN> input,
      String name,
      StreamOperatorFactory<OUT> operatorFactory,
      TypeInformation<OUT> outputType,
      int parallelism) {
   super(name, outputType, parallelism);
   this.input = input;
   this.operatorFactory = operatorFactory;
}

再看一下调用信息

代码语言:javascript
复制
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
      this.transformation,
      operatorName,
      operatorFactory,
      outTypeInfo,
      environment.getParallelism());

也就是说,FlatMap 的 transformation 信息中,有一个 input 属性,其值是 env.fromElements 的 transformation。

通俗点讲就是,FlatMap 的 transformation 中记录了它的输入是 env.fromElements() 。

最后返回了 SingleOutputStreamOperator 对象,这里面封装了 FlatMap 的 transformation 信息。

我们可以 debug 到这里来看看它的返回值:

然后需要关注的事情是,它最终调用了这个方法:

代码语言:javascript
复制
getExecutionEnvironment().addOperator(resultTransform);
代码语言:javascript
复制
public void addOperator(Transformation<?> transformation) {
   Preconditions.checkNotNull(transformation, "transformation must not be null.");
   this.transformations.add(transformation);
}

也就是加入到了 transformations 列表中去。

FlatMap 最后返回了一个 SingleOutputStreamOperator 类,这个类也是 DataStream 的子类。

所以,看到这基本能够理解,我们写的代码,其实本质都是 Flink 封装后对外暴露的简单易用的 api,Flink 在背后做了大部分事情。

六、KeyBy 算子源码分析

keyBy 也是 DataStream 的一个方法,它 new 了一个 KeyedStream,并且把 this 传入了构造函数中,this 是什么?this 就是刚刚 FlatMap 的返回值,还记得吗?里面记录了 FlatMap 的 transformation。

keyBy 对应的 transformation 是 PartitionTransformation,里面也有 input 属性,直接把 this.getTransformation() 传给了 input 了。

我们来 debug 看一下返回值:

有点像套娃,一层又一层的。

需要注意的是,KeyBy 只是一个虚拟的节点,它并没有加入到 transformations 列表中来。

七、sum 算子的源码分析

这个我们就不细看了,套路都差不多了,直接 debug 看一下返回值:

sum 算子有调用这个方法:

代码语言:javascript
复制
getExecutionEnvironment().addOperator(reduce);

加入到了 transformations 属性中来。

八、sink 算子的源码分析

和 sum 一样,我们直接 debug 一下最终的结果:

可见 sink 中,也套娃式的记录了所有的 input。

最后,sink 也调用了

代码语言:javascript
复制
getExecutionEnvironment().addOperator(sink.getTransformation());

九、生成 StreamGraph

这个生成的过程,就是递归遍历 transformations 列表中的每一个值及其输入,根据不同的情况,使用不同的逻辑来构建 StreamGraph。

(这个我们下次讲,哈哈!裤子都脱了,给我看这个?不要着急,慢慢来)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-02-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、絮叨两句
  • 二、引出问题
  • 三、记住一个非常重要的属性
  • 四、从 env.fromElements() 开始
  • 五、FlatMap 算子源码分析
  • 六、KeyBy 算子源码分析
  • 七、sum 算子的源码分析
  • 八、sink 算子的源码分析
  • 九、生成 StreamGraph
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档