前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】第二十五篇:源码角度分析作业提交逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

作者头像
章鱼carl
发布2022-03-31 11:19:29
8290
发布2022-03-31 11:19:29
举报
文章被收录于专栏:章鱼carl的专栏

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

继上篇 【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑 之后,我们从一个WordCount程序入手,探索了在调用execute提交作业之前的源码主线逻辑:经过DataStream API的一系列链式调用,得到一个重要的数据结构:List<Tansformation>。最终用户调用execute方法,这里传给execute的就是它。

本文继续讨论:在execute里,在最终将作业提交到集群后,在集群调度作业之前,Flink主要做了些什么。

同样,先将主要的结论列出来,以便在阅读源码时可以和笔者有一个基本一致的语境。

本文讨论的内容主要包含了两个阶段(例如,从yarn的per-job提交模式):

1. 运行flink.sh脚本,调起Flink Client(ClientFrontend),通过反射启动Jar中的main函数,生成StreamGraph、JobGraph,由PipelineExecutor提交给集群

2. 集群收到JobGraph,将JobGraph翻译成ExecutionGraph,然后开始调度执行,启动成功之后开始消费

DAG流转过程

JobManager的主要构成

yarn集群为例,

1. Dispacher:一个,提供Rest接口接收作业,不负责实际的调度执行

2. JobMaster:一个作业一个,负责作业调度、管理作业,Task生命周期

3. YarnResourceManager:一个,资源管理

Flink提交模式

  • 本地local:LocalExecutor。
  • session:AbstractSessionClusterExecutor,通过http协议提交作业。通过yarn-session.sh脚本启动,检查是否存在已经启动好的Flink Session模式集群,如果没有,则启动一个。然后在PipelineExecutor中通过Dsipatcher提供的Rest接口提交JobGraph,Dsipatcher为每个作业启动一个JobMaster,进入作业执行阶段。
  • per-job:AbstractJobClusterExecutor。在提交的时候创建集群,将JobGraph及其所需的文件等一同提交给Yarn集群,剩下的和yarn-session模式下一样。

yarn session提交流程

1. 启动集群

1) 使用yarn-session.sh提交会话模式的作业

2) 如果没有Flink Session集群,启动新的Flink Session集群

首先将应用配置和相关文件上传至HDFS;Yarn Client向Yarn提交创建Flink Session集群的申请,在分配的Containner中启动JobManager进程,并在其中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。

2. 作业提交

1) Flink Client通过Rest向Dsipatcher提交作业

2) 为作业创建一个JobMaster,构建ExecutionGraph

3. 作业调度执行

1) JobMaster向YarnResourceManager申请资源

2) YarnResourceManager如果没有可提供的slot则向Yarn的ResourceManager申请Containner,启动TaskManager

3) 在Yarn分配的Containner中启动新的TaskManager,并从HDFS上加载Jar所需资源

4) TaskManager启动之后,向YarnResourceManager注册自己和自己的slot资源情况

5) YarnResourceManager从等待队列取出JobMaster的slot请求,通知相应的TaskManager将slot分配给了哪些JobMaster

6) JobMaster将Task调度到该TaskManager的slot上

DAG流转细节

1. StreamGraph:

2. JobGraph

3. ExecutionGraph

源码分析

分析两部分:

1. 由flink shell 脚本 到 Flink作业 jar 的过程;

2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;

下面开始介绍,

1. 由flink shell 脚本 到 Flink作业 jar 的过程;

打开FLINK_HOME下的flink脚本,在最后一行可以看到flink本质是执行了一个org.apache.flink. client.cli.CliFrontend,

所以,我们从CliFrontend.java的main入口方法找起,

以上,主要做了:

1. 将本地配置文件及命令行配置项加载到全局配置中

2. 构造CliFrontend,运行它的parseAndRun

接着看parseAndRun,

以上,主要做了,

1. 取命令行输入参数的第一个动作action,这里我们以yarn-cluster方式提交,所以第一个单词是:flink

2. 以action进入开关语句,这里我们进入第一个分支:ACTION_RUN

接着进入run(params),

这里,主要有三个动作:

1. 得到依赖jar

2. 封装各种配置

3. 得到作业包,封装成PackagedProgram

顺着作业执行这条主线,可以追溯到callMainMethod方法

以上,通过java反射,从作业包的主类中拿main方法,并且调用main,从这里开始便进入了WordCount的main方法。

2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;


从env.execute("Window WordCount") 深入源码,

我们可以看到,在execute的第一层就进行了StreamGraph的绘制,继续深入,经过2次重构方法的调用,

这里我们看到,StreamGraphGenerator核心类是具体绘制StreamGraph的类。

进入generate,

这里一个非常关键的循环,在循环里对之前上一节输出的transformations列表进行遍历,对每个transformation进行transform转换操作,循环转换完后就返回产生的StreamGraph。我们接着看transform方法,

主要有三个步骤,

1. 判断是否已经转换过了,如果当前transform已经转换过,直接返回

2. 得到合适的translator,有哪些translator?如下,

3.如果得到translator,执行translate方法,没得到则执行遗留的legacyTransform

继续看translate方法,

1. 得到所有输入Id

2. 再次检查是否当前transform已经被转换过了

3. 调用translator对当前transform实例进行translator

这里有个很关键的地方,即getParentInputIds,

这里,对所有parentTransformations再次执行了之前已经阅读过的:transform,到这里显示形成了一个递归的逻辑调用,结合之前的调用很容易就总结到如下递归调用的意图:

起始,从transformations列表第一个transformation进行循环,每次都检查当前transformation的上游所有输入的transformations是否被处理过了,这种检查是一个递归的过程,并且结束条件是,当前transformation被包含在alreadyTransformed集合中了。

理解了整个递归处理transformations元素后,我们就可以进一步看看这个所谓的处理逻辑到底做了些什么。

我们找到translator的实现类AbstractOneInputTransformationTranslator,我们来看看这个UML类图,

从AbstractOneInputTransformationTranslator中即可找到这个很关键的方法,

终于到了StreamGraph的算法操作:

1. addOperator添加节点,节点对应transformation

2. addEdge添加边,包含上游所有输入边

综上,本质上就是一个利用递归操作绘制DAG的过程。

至于,StreamGraph如何转换为了JobGraph,而JobGraph由如何被JobManagerd的JobMaster转换为了可调度的ExecutionGraph,这里就不再赘述了,核心思想是一致的,只不过是为了使得这个DAG适合在相应的应用层面上而进行了一系列的丰富和优化,例如加入并行的概念,对齐进行OperatorChain的优化。

后续文章要讨论的是ExecutionGraph是如何被调度到集群上的TaskManager中执行的。。。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档