Flink 作业在开发完毕之后,需要提交到 Flink 集群执行。ClientFronted 是入口,触发用户开发的 Flink 应用 Jar 文件中的 main 方法,然后交给 PipelineExecutor(流水线执行器,在 FlinkClient 升成 JobGraph 之后,将作业提交给集群的重要环节。)#execue 方法,最终会选择一个触发一个具体的 PiplineExecutor 执行。
提交模式又可分为:
启动集群:
作业提交:
Yarn 集群准备好后,开始作业提交。
1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。
2)Dispatcher 是 Rest 接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobManager(负责作业调度、管理作业和 Task 的生命周期),构建 ExecutionGraph(JobGraph 的并行化版本,调度层最核心的数据结构)。
这两个步骤结束后,作业进入调度执行阶段。
作业调度执行:
1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 执行,向 YarnResourceManager 申请资源;初次提交作业集群中尚没有 TaskManager,此时资源不足,开始申请资源。
2)YarnResourceManager 收到 JobManager 的资源请求,如果当前有空闲 Slot 则将 Slot 分配给 JobMaster.,否则 YarnResourceManager 将向 YarnMaster 请求创建 TaskManager。
3)YarnResourceManager 将资源请求加入到等待请求队列,并通过心跳向 Yarn RM 申请新的 Container 资源来启动 TaskManager 进程,Yarn 分配新的 Container 给 TaskManager。
4)YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需要的的相关资源,在容器中启动 TaskManager。
5)TaskManager 启动之后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResouceManager。
6)ResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。
7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。
启动集群:
作业提交:
该步骤与 Seesion 模式下的不同,Client 并不会通过 Rest 向 Dispatcher 提交 JobGraph,由 Dispatcher 从本地文件系统获取 JObGraph,其后的不好走与 Session 模式的一样
作业调度执行:
与 Yarn Session 模式下一致。
启动集群:
两个步骤完成之后,Session 模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有 JobManager、TaskManager,当作业需要执行时,才会按需创建。
作业提交:
两个步骤完成之后,作业进入调度执行阶段。
作业调度执行:
K8s Session 模式集群下,ResourceManager 向 k8sMaster 申请和释放 TaskManager,除此之外,作业的调度与执行和 Yarn 模式是一样的。
1)JobMaster 向 KubernetesResourceManager 请求 Slot。
2)KubernetesResourceManager 从 kubernetes 集群分配 TaskManager。每个 TaskManager 都是具有唯一标识的 Pod。KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件,里面有 Flink Master 的 service name 作为地址。这样在 FLInkMaster failover 之后,TaskManager 仍然可以重新连上。
3)Kubernetes 集群分配一个新的 Pod 后,在上面启动 TaskManager。
4)TaskManager 启动后注册到 SlotManager。
5)SlotManager 向 TaskManager 请求 Slot.
6)TaskManager 提供 Slot 给 JobMaster,然后任务就会被分配到这个 Slot 上运行。
使用 DataStreamAPI 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。
StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph 实际上是在 StreamGraphGenerator 中生成的,从 SinkTransformation(输出) 向前追溯到 SourceTransformation。在遍历过程中一边遍历一边构建 StreamGraph。
在遍历 Transformation 的过程中,会对不同类型的 Transformation 分别进行转换。对于物理 Transformation 则转换为 StreamNode 实体,对于虚拟 Transformation 则作为虚拟 StreamNode。
针对于某一种类型的 Transformation,会调用其相应的 transformxxx()函数进行转换。transfromxxx()首先转换上游 Transformation 进行递归转换,确保上游的都已经完成了转换。然后通过 addOperator()方法构造出 StreamNode,通过 addEdge()方法与上游的 transform 进行连接,构造出 StreamEdge。
在添加 StreamEdge 的过程中,如果 ShuffleMode 为 null,则使用 ShuffleMode PIPELINED 模式,在流计算中,只有 PIPLINED 模式才会在批处理中设计其他模式。构建 StreamEdge 的时候,在转换 Transformation 过程中生成的 虚拟 StreamNode 会将虚拟 StreamNode 的信息附着在 StreamEdge 上
虚拟的 Transformation 生成的时候不会转换为 SteramNode,而是添加为虚拟节点。
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
//当上游是sideoutput时,递归调用,并传入sideoutput信息
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
}
//当上游是select时,递归调用,并传入select信息
else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
//当上游是Partition时,递归调用,并传入Partition信息
else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
//不是以上逻辑转换的情况,真正构建StreamEdge
else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
//没有指定partitioner时,会为其选择forward或者rebalance
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
//创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
JobGraph 可以由流计算的 StreamGraph 和批处理的 OptimizedPlan 转换而来。流计算中,在 StreamGraph 的基础上进行了一些优化,如果通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网段的传递。
StreamingJobGraphGenerator 负责流计算 JobGraph 的生成,在转换前需要进行一系列的预处理。
private JobGraph createJobGraph() {
preValidate();
// make sure that all vertices start immediately
//设置调度模式
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
//为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
//为了向后保持兼容,为每个节点生成老版本的hash id
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
//真正对SteramGraph进行转换,生成JobGraph图
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
//设置共享slotgroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getMinResources(),
id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
//配置checkpoint
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
//如果有之前的缓存文件的配置,则重新读入
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
//设置执行环境配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
预处理完毕后,开始构建 JobGraph 中的点和边,从 Source 向下遍历 StreamGraph,逐步创建 JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。
执行具体的 Chain 和 JobVertex 生成、JobEdge 的关联、IntermediateDataSet。从 StreamGraph 读取数据的 StreamNode 开始,递归遍历同时将 StreamOperator 连接在一起。
整理构建的逻辑如下(看上图!!!):
1)从 Source 开始,Source 与下游的 FlatMap 不可连接,Source 是起始节点,自己成为一个 JobVertx。
2)此时开始一个新的连接分析,FlatMap 是起始节点,与下游的 KeyedAgg 也不可以连接,那么 FlatMap 自己成为一个 JobVertex。
3)此时开始一个新的连接分析。KeyedAgg 是起始节点,并且与下游的 Sink 可以连接,那么递归地分析 Sink 节点,构造 Sink 与其下游是否可以连接,因为 Slink 没有下游,所以 KeyedAgg 和 Sink 节点连接在一起,共同构成了一个 JobVertex。在这个 JobVertex 中,KeyedAgg 是起始节点,index 编号为 0,sink 节点 index 编号为 1.
构建 JobVertex 的时候需要将 StreamNode 中的重要配置信息复制到 JobVertex 中。构建好 JobVertex 之后,需要构建 JobEdge 将 JobVertex 连接起来。KeyedAgg 和 Sink 之间构成了一个算子连接,连接内部的算子之间无序构成 JobEdge 进行连接。
在构建 JobEdge 的时候,很重要的一点是确定上游 JobVertex 和下游 JobVertex 的数据交换方式。此时根据 ShuffleMode 来确定 ResultPartition 类型,用 FlinkPartition 来确定 JobVertex 的连接方式。
Shuffle 确定了 ResultPartition,那么就可以确定上游 JobVertex 输出的 IntermediateDataSet 的类型了,也就知道 JobEdge 的输入 IntermediateDataSet。
ForwardPartitioner 和 RescalePartitioner 两种类型的 Partitioner 转换为 DistributionPattern.POINTWISE 的分发模式。其他类型的 Partitioner 统一转换为 DistributionPattern.ALL_TO_ALL 模式。
JobGraph 的构建和 OperatorChain 优化:
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
//获取当前节点的出边,判断是否符合OperatorChain的条件
//分为两类:chainableoutputs,nonchainableoutputs
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//对于chainable的边,递归调用createchain
//返回值添加到transitiveOutEdges中
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
//对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点
//进行递归调用,返回值添加到transitiveOutEdges中
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
//如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfig
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
//将StreamNode中的配置信息序列化到Streamconfig中。
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset
//否则将当前节点的StreamConfig 添加到chainedConfig中。
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setOutEdgesInOrder(transitiveOutEdges);
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
一个 Operatorchain 在同一个 Task 线程内执行。OperatorChain 内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量
operatorchain 的条件:
1)下游节点的入度为 1
2)SteramEdge 的下游节点对应的算子不为 null
3)StreamEdge 的上游节点对应的算子不为 null
4)StreamEdge 的上下游节点拥有相同的 slotSharingGroup,默认都是 default.
5)下游算子的连接策略为 ALWAYS.
6)上游算子的连接策略为 ALWAYS 或者 HEAD.
7)StreamEdge 的分区类型为 ForwardPartitioner
8)上下游节点的并行度一致
9)当前 StreamGraph 允许 chain
初始话作业调度器的时候,根据 JobGra ph 生活 ExecutionGraph。在 SchedulerBase 的构造方法中触发构建,最终调用 SchedulerBase#createExecutionGraph 触发实际的构建动作,使用 ExecutionGraphBuiler 构建 ExecutionGraph。
核心代码 attachJobGraph:
构建 ExecutionEdge 的连接策略:
b:每个 Task 消费的上游分区结果数据不均,如上游有 3 个结果分区,下游有两个 Task,那么一个 Task 分配 2 个结果分区消费,另一个分配一个结果分区消费。
3)一对多连接:下游的 Task 数量多余上游的分区数,此时两种情况:
a:每个结果分区的下游消费 Task 数据量相同,如上游有两个结果分区,下游有 4 个 Task,每个结果分区被两个 Task 消费。
b:每个结果分区的下游消费 Task 数量不相同,如上游有两个结果分区,下游有 3 个 Task,那么一个结果分区分配 2 个 Task 消费,另一个结果分区分配一个 Task 消费。