前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 作业生成②:StreamGraph -> JobGraph

Flink 作业生成②:StreamGraph -> JobGraph

作者头像
codingforfun
发布2021-05-27 10:53:34
1.2K0
发布2021-05-27 10:53:34
举报

由前文我们知道,StreamGraph 表示一个流任务的逻辑拓扑,可以用一个 DAG 来表示(代码实现上没有一个 DAG 结构),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。本文我们主要介绍一个 StreamGraph 是如何转换成一个 JobGraph。

一、JobGraph 概述

  • JobGraph 将会在原来的基础上做相应的优化(主要是算子的 Chain 操作,Chain 在一起的算子将会在同一个 task 上运行,会极大减少 shuffle 的开销)
  • JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(JobVertex)、中间结果(IntermediateDataSet)和边(JobEdge)组成的 DAG 图
  • JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置

为什么要有 StreamGraph 和 JobGraph 两层的 Graph,最主要的原因是为兼容 batch process,Streaming process 最初产生的是 StreamGraph,而 batch process 产生的则是 OptimizedPlan,但是它们最后都会转换为 JobGraph

1.1、JobVertex

JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode。主要成员:

  • List<OperatorIDPair> operatorIDs:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids
  • ArrayList<JobEdge> inputs:带输入数据的边列表
  • ArrayList<IntermediateDataSet> results:job 节点计算出的中间结果

1.2、IntermediateDataSet

它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集。中间数据集可能会被其他 operators 读取,物化或丢弃。主要成员:

  • JobVertex producer:该中间结果的生产者
  • List<JobEdge> consumers:该中间结果消费边,通过消费边指向消费的节点
  • ResultPartitionType resultType:中间结果的分区类型
    • 流水线的(有界的或无界的):一旦产生数据就向下游发送,可能是逐个发送的,有界或无界的记录流。
    • 阻塞:仅在生成完整结果时向下游发送数据。

1.3、JobEdge

它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。主要成员:

  • IntermediateDataSet sourc:边的源
  • JobVertex target:边的目标
  • DistributionPattern distributionPattern:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
    • ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
    • POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务

二、Create Job Graph 主要流程

2.1、核心步骤

2.2、setChaining

从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例

这个方法首先从会遍历这个 StreamGraph 的所有 source 节点,然后选择从 source 节点开始执行 createChain() 方法,在具体的实现里,主要逻辑如下

总结下这个流程:

  1. 从输入节点开始,判断边的输出节点能否加入到该 chain
    • 如果可以,则继续从输出节点执行扩展该 chain
    • 否则,当前 chain 结束,以输出节点为初始节点,递归创建新的 chain
  2. 如果当前节点为 chain 的首节点,那么就创建一个 JobVertex,否则创建 StreamConfig,记录到 chainedConfigs(由于调用链上后面的节点先创建,因此创建首节点的 JobVertex 时,就可以使用 chainedConfigs 记录的信息了)

其中 JobEdge 是通过下游 JobVertex 的 connectNewDataSetAsInput 方法来创建的,在创建 JobEdge 之前,会先用上游 JobVertex 创建一个 IntermediateDataSet 实例,用来作为上游 JobVertex 的结果输出,然后作为 JobEdge 的输入,构建JobEdge实例,具体实现如下:

public JobEdge connectNewDataSetAsInput(
      JobVertex input,
      DistributionPattern distPattern,
      ResultPartitionType partitionType) {
   /** 创建输入JobVertex的输出数据集合 */
   IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
   /** 构建 JobEdge 实例 */
   JobEdge edge = new JobEdge(dataSet, this, distPattern);
   /** 将 JobEdge 实例,作为当前 JobVertex 的输入 */
   this.inputs.add(edge);
   /** 设置中间结果集合 dataSet 的消费者是上面创建的 JobEdge */
   dataSet.addConsumer(edge);
   return edge;
}

通过上述的构建过程,就可以实现上下游 JobVertex 的连接,上游 JobVertex ——> 中间结果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex。其中:

  • IntermediateDataSet 和 JobEdge 是用来建立上下游 JobVertex 之间连接的配置
  • 一个 IntermediateDataSet 有一个 producer,可以有多个消费者 JobEdge
  • 一个 JobEdge 则有一个数据源 IntermediateDataSet,一个目标JobVertex
  • 一个 JobVertex 可以产生多个输出 IntermediateDataSet,也可以接受来自多个 JobEdge 的数据

2.3、算子 Chainable 的依据

isChainable() 的判断依据如下:

return downStreamVertex.getInEdges().size() == 1 // 
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 对应的 slotSharingGroup 一样
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允许 chain 操作
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允许跟后面的 chain 在一起
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 类型
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并发相等
            && streamGraph.isChainingEnabled(); // StreamGraph 允许 Chain 在一起

2.3.1、slotSharingGroup

一个 StreamNode 的 SlotSharingGroup 会按照下面这个逻辑来确定:

  1. 如果用户指定了 SlotSharingGroup,直接使用这个 SlotSharingGroup name;
  2. 如果所有的 input 都是同一个 group name,使用这个即可;
  3. 否则使用 default group;

2.3.2、edge.getPartitioner()

StreamPartitioner 的实现

用户可以在自己的代码中调用 DataStream API (比如:broadcast()shuffle() 等)配置相应的 StreamPartitioner,如果这个没有指定 StreamPartitioner 的话,则会走下面的逻辑创建默认的 StreamPartitioner:

//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的话,会为其选择 forward(并发设置相同时) 或 rebalance(并发设置不同时)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

三、参考

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、JobGraph 概述
    • 1.1、JobVertex
      • 1.2、IntermediateDataSet
        • 1.3、JobEdge
        • 二、Create Job Graph 主要流程
          • 2.1、核心步骤
            • 2.2、setChaining
              • 2.3、算子 Chainable 的依据
              • 三、参考
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档