前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从头分析flink源码第三篇之jobGraph的生成

从头分析flink源码第三篇之jobGraph的生成

作者头像
山行AI
发布2021-07-01 15:34:14
1.7K1
发布2021-07-01 15:34:14
举报
文章被收录于专栏:山行AI山行AI

上一篇中我们分析了一个简单的flink wordcount程序由DataStream的transformation列表转换成StreamGraph的过程,紧接着上文的步骤,本文我们着重分析一下从streamGraph生成jobGraph的过程。

背景

上一篇中我们分析了StreamGraph的生成,StreamGraph的大致结构如下:

分析入口

对于stream模式的执行会在生成StreamGraph之后都会进入到StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):

代码语言:javascript
复制
@Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
        final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);
        checkNotNull(
            executorFactory,
            "Cannot find compatible factory for specified execution.target (=%s)",
            configuration.get(DeploymentOptions.TARGET));
        CompletableFuture<JobClient> jobClientFuture = executorFactory
            .getExecutor(configuration)
            .execute(streamGraph, configuration, userClassloader);
        --------------省略部分代码----------------------

这里主要有两步操作:

•通过executorServiceLoader获取executorFactory,它是PipelineExecutorFactory类型的,主要为streamGraph的执行提供executor,它的实现类主要有以下几种:

•通过executorFactory获取executor,然后使用executor执行streamGraph,这里我们先以LocalExecutor为例。LocalExecutor#execute方法代码如下:

代码语言:javascript
复制
  @Override
      public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
          checkNotNull(pipeline);
          checkNotNull(configuration);
          // 有效的配置
          Configuration effectiveConfig = new Configuration();
          effectiveConfig.addAll(this.configuration);
          effectiveConfig.addAll(configuration);
          // we only support attached execution with the local executor.
          checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
          // 获取了jobGraph
          final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
          return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
      }

可以看到,这个方法中会选进行jobGraph的生成,然后将jobGraph提交到miniCluster上去进行下一步的操作。LocalExecutor的getJobGraph方法将是本文分析的重点。

LocalExecutor#getJobGraph

这里我们就不再一步步地贴代码了,直接来看整个调用链路:

StreamingJobGraphGenerator中包含streamGraph的全部信息,并最终由它来进行jobGraph的生成,我们接下来重点分析一下。

StreamingJobGraphGenerator#createJobGraph()方法

生成jobGraph的方法代码如下:

代码语言:javascript
复制
private JobGraph createJobGraph() {
        preValidate();
        // streaming模式下都是EAGER模式
        // make sure that all vertices start immediately
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());
        // 返回是否启用了近似本地恢复
        jobGraph.enableApproximateLocalRecovery(streamGraph
            .getCheckpointConfig()
            .isApproximateLocalRecoveryEnabled());
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        // hash值用来标识JobVertexID,以便在作业提交时识别节点(如果它们没有更改)。这里会按顺序遍历整个streamGraph,为所有StreamNode都生成一个唯一的hash值。
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
            streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        // 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
        // 输入新版本的hashes值和旧版本的legacyHashes
        // 设置算子链
        setChaining(hashes, legacyHashes);
        // 物理边,主要为Edge的目标顶点设置特理入边,并进入序列化。主要用有序的物理边及其typeNumber来控制多输入时InputGate的顺序;多输入的情况下typeNumber是在1的基础上以步长为1增加的
        setPhysicalEdges();
        // 设置slot sharing group和colocation
        setSlotSharingAndCoLocation();
        // 设置内存
        setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
        // 配置checkpoint
        configureCheckpointing();
        // 配置savepoint
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
        JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                "This indicates that non-serializable types (like custom serializers) were registered");
        }
        return jobGraph;
    }

该方法比较长,我们拆开为以下几步来进行分析:

1.为每个StreamNode生成一个唯一的hash值;2.将能够chain在一起的StreamNode chain在一起,生成StreamConfig和JobVertex、JobEdge等信息;3.设置物理边信息;4.设置slot sharing group和co-location;5.设置内存、checkpoint、savepoint和其他用户自定义信息。

1. 为每个StreamNode生成一个唯一的hash值

目前分析的flink 1.12.0版本的源码中维护着两种hash值生成方案,一种是默认的hash值生成,另一种是用户自定义的。代码如下:

代码语言:javascript
复制
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
            streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        // 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

•defaultStreamGraphHasher是StreamGraphHasherV2类型的,如果用户指定了transformationId,会以transformationId的值来生成该节点的hash值;否则会使用节点的输入输出信息及输入节点的hash值来进行hash值的生成。这里会生成一份nodeId与节点hash值的映射关系;•legacyStreamGraphHashers在当前版本中为StreamGraphUserHashHasher,它主要是根据StreamNode中的userHash值来作为结果map的value。

2.设置算子链

从source节点为起点进行递归遍历来获取最终的jobGraph。我们来看一下代码:

代码语言:javascript
复制
/**
     * Sets up task chains from the source {@link StreamNode} instances.
     *
     * <p>This will recursively create all {@link JobVertex} instances.
     */
    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // we separate out the sources that run as inputs to another operator (chained inputs)
        // from the sources that needs to run as the main (head) operator.
        final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(
            hashes,
            legacyHashes);
        // 初始的值,内部是array copy,@see java.util.ArrayList.toArray()
        final Collection<OperatorChainInfo> initialEntryPoints = new ArrayList<>(chainEntryPoints.values());

        // iterate over a copy of the values, because this map gets concurrently modified
        for (OperatorChainInfo info : initialEntryPoints) {
            createChain(
                // 从source节点开始
                info.getStartNodeId(),
                1,  // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
        }
    }

在buildChainedInputsAndGetHeadInputs方法内部会将source节点及ChainingStrategy为HEAD_WITH_SOURCES的节点生成OperatorChainInfo对象。然后进入到createChain方法,我们直接来看代码:

代码语言:javascript
复制
    private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {
        Integer startNodeId = chainInfo.getStartNodeId();
        if (!builtVertices.contains(startNodeId)) {
            // 记录不能chain在一起的OperatorChainInfo的起始节点,为了后面OperatorChainInfo的connect做准备
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            //遍历当前节点的出边,放到chainableOutputs和nonChainableOutputs列表中
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
            //处理当前节点可以chain在一起的outputs
            for (StreamEdge chainable : chainableOutputs) {
                // 所有可以进行Chain的出边也都进行处理
                transitiveOutEdges.addAll(
                    createChain(
                        chainable.getTargetId(),
                        chainIndex + 1,
                        chainInfo,// 可以chain在一起的出边也会与它的source节点共用一个chainInfo
                        chainEntryPoints));
            }
            for (StreamEdge nonChainable : nonChainableOutputs) {// 不能chain在一起的在这里递归处理
                transitiveOutEdges.add(nonChainable);
                createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                        nonChainable.getTargetId(),
                        // 创建新的chainInfo,它的startId为其目标节点的id。因为它本身已经放到了transitiveOutEdges中
                        (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
            }
            // 节点id和链名称的映射
            chainedNames.put(
                currentNodeId,
                createChainedName(
                    currentNodeId,
                    chainableOutputs,
                    Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            // 当前节点的资源情况
            chainedMinResources.put(
                currentNodeId,
                createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));
            // 当前的chainInfo,可能包括多个节点。把当前节点加入到chain中来(包括节点hashes和legacyHashes)
            OperatorID currentOperatorId = chainInfo.addNodeToChain(
                currentNodeId,
                chainedNames.get(currentNodeId));
            // 如果A->B可以chain在一起,最后的情况是B结点先运行到这里,然后A结点运行到这里。它们的startNodeId都是A节点的nodeId
            if (currentNode.getInputFormat() != null) {
                // 如果当前节点的inputFormat不为空,则创建formatContainer
                getOrCreateFormatContainer(startNodeId).addInputFormat(
                    currentOperatorId,
                    currentNode.getInputFormat());// input类型的,放入chainedInputOutputFormats中,在创䢖JobVertex时会有所不同
            }
            if (currentNode.getOutputFormat() != null) {
                // 如果有outPutFormat则指定outputFormat
                getOrCreateFormatContainer(startNodeId).addOutputFormat(
                    currentOperatorId,
                    currentNode.getOutputFormat());// output类型的,放入chainedInputOutputFormats中,在创建Jobvertex时会有所不同
            }
            // StreamConfig是可序列化的,如果当前节点为chain的头节点的话,则创建JobVertex,否则创建StreamConfig
            // 头节点运行到这里发生在同一个chain中的节点的后面;也就是说对每个链来说这一步是从后向前执行的
            StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, chainInfo)
                : new StreamConfig(new Configuration());
            // 设置顶点配置,如果当前节点是startNode则会在上面一步中生成JobVertex并返回该vertex的StreamConfig,如果不是startNode则会在上一步中创建新的StreamConfig;头节点运行到这里发生在同一个chain中的节点的后面
            setVertexConfig(
                currentNodeId,
                config,
                chainableOutputs,
                nonChainableOutputs,
                chainInfo.getChainedSources());
            if (currentNodeId.equals(startNodeId)) {
                // 如果当前节点就是chain的头节点
                config.setChainStart();
                // 设置chainIndex
                config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                for (StreamEdge edge : transitiveOutEdges) {
                    // 这里会连接上一个算子链(可能也只有一个节点,也可能是多个节点chain在一起的,如果该链是最下游的节点则无需进行connect)与下游的不能chain在一起的出边对应的node
                    connect(
                        startNodeId,
                        edge);// 主要是生成jobEdge,而且整个jobVertex的生成是从后向前的,到这个节点时证明它的下游节点的jobVertex都已经创建成功了。这里会生成jobEdge并放到下游节点的inputs列表中去。
                }
                // 设置它的有序输出边
                config.setOutEdgesInOrder(transitiveOutEdges);
                // 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
            } else {
                chainedConfigs.computeIfAbsent(
                    startNodeId,
                    k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);// 设置在链中的索引,只有非头节点会生成chainedConfigs内容,头节点会通过StreamConfig#setTransitiveChainedTaskConfigs将这些配置放到它的StreamConfig中去
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            // 设置operatorID,里面能取到该节点的primaryHash值
            config.setOperatorID(currentOperatorId);
            // chainableOutputs对应的节点就是当前chain的最终节点了,一个节点可能既是头节点又是结尾节点(上下都没有chain的情况)
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // 每一层往外返回的都是outEdge,最终返回的会是所有的StreamEdge列表
            return transitiveOutEdges;
        } else {
            return new ArrayList<>();
        }
    }

大致逻辑如下:

由于streamGraph的StreamNode之间是通过StreamEdge来连接的,所以这里对每个节点的出边进行迭代,把能chain在一起的chain在一起,然后生成JobVertex、JobEdge和IntermediateDataSet。

对于能chain在一起的节点,会延用其source节点的OperatorChainInfo并递增chainIndex向下递归,不能chain在一起的节点会新建OperatorChainInfo往下递归。

整个StreamNode链,递归的结果是从后往前处理剩余非递归部分的逻辑的。只会在每个子链的头结点的部分生成JobVertex和JobEdge及IntermediateDataSet;如果不是每个子链的startNode,则证明目前处理的还是每个子链的下游节点部分,会将该节点需要处理的信息放到StreamConfig对象中。如果是每个子链的startNode,则证明它的下游节点已经处理完成了,会生成JobVertex并将JobVertex节点信息放到StreamConfig中返回回来。

当递归处理时处理到的节点为每个子链的头节点时,会进行连接操作,将该节点与其下游子链的头节点进行连接,生成JobEdge和IntermediateDataSet(这部分的逻辑在StreamingJobGraphGenerator#connect方法中),这里会指定连接方式是POINTWISE还是ALL_TO_ALL。同时将当前建立连接的出边信息放到physicalEdgesInOrder列表中,这里维护的是有序的物理边,从后往前有序,在下文物理边的设置中会使用到。

需要注意的是chain在一起的节点的StreamConfig是怎么处理的,迭代过程中有个判断,如果是头节点的话会进行如下设置:

代码语言:javascript
复制
    // 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

    public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
        try {
            InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
        } catch (IOException e) {
            throw new StreamTaskException("Could not serialize configuration.", e);
        }
    }

chainedConfigs里面是存放所有能chain在一起的节点的容器,以startNodeId为key,非头节点的nodeId和StreamConfig的映射为value。这里会将这些信息存到头节点配置的chainedTaskConfig_中去。

这setChaining方法执行完之后,StreamingJobGraphGenerator中的属性大致如下图:

可以看到每个StreamNode都对应一个StreamConfig对象。节点4和节点5是chain在一起的,其中节点4为头节点。这一点在jobGraph中也有体现,在有chain在一起的JobVertex中有两个operatorId:

需要注意的是:这个operatorID主要是给checkpoint使用的。

到这里大家应该会有个疑问,那就是在生成OneInputStreamTask的时候,这个有两个operator chain在一起的这个JobVertexID最后是怎么识别另外一个算子操作的呢(在ExecutionGraph中也不涉及到这部分)?

这里我们简单地提一下,在StreamTask的beforeInvoke方法中会为每一个Task生成一个OperatorChain对象,在创建outputCollector时会根据chain的情况生成对应的operator放到allOperatorWrappers中。

关于这一点我们后续会用专门的篇幅来分析,这里顺带着提一下。这里会对当前task涉及到的operatorConfig生成对应的operator,将task的输入和输出进行一个串联,对于chain在一起的operator之间不会涉及数据的网络传输。

在setChaining操作之后生成的jobGraph主体结构如下图:

3. 设置物理边

直接来看代码:

代码语言:javascript
复制
private void setPhysicalEdges() {
        Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
        // 有序的StreamEdge
        for (StreamEdge edge : physicalEdgesInOrder) {// 遍历有序的物理边集合
            int target = edge.getTargetId();
            List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(
                target,
                k -> new ArrayList<>());// 如果不存在返回新创建的list
            inEdges.add(edge);// 将边添加进去,形成targetId->edge集合之间的映射
        }
        // 为什么是从前往后找,为什么要找targetId?  因为下面要设置JobVertex顶点的streamConfig的InPhysicalEdges
        for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
            int vertex = inEdges.getKey();// 目标顶点id
            List<StreamEdge> edgeList = inEdges.getValue();// 相对于目标顶点来说这是入边
            // 物理边设置的还是StreamEdge
            // 对于多输入的task有用,主要用于确定输入的顺序,在jobEdge中是没办法确定这种顺序的
            vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);// 对目标顶点设置物理入边,序列化操作
        }
    }

这里会遍历physicalEdgesInOrder列表,它是一个有序的物理边集合,在wordcount实例中,它运行到这里时的实际值如下图:

顺序是从后往前。setPhysicalEdges方法的作用无非是向Flat Map节点的StreamConfig和Keyed Aggregation节点的StreamConfig中设置入边列表,并进行序列化操作。

4. 设置slot sharing group和co-location

这一部分会先进行region的划分(涉及到连通器算法),然后设置slot sharingGroup信息。再根据slot sharingGroup的信息对设置了coLocationGroupKey的节点进行coLocationGroups的划分。主要作用还是体现在slot资源调度上,关于这点的详细信息可以参考博客:http://chenyuzhao.me/2017/02/09/flink-scheduler/

5. 其他设置

进行内存、checkpoint、savepoint、用户文件等的设置,最终形成的jobGraph结构如下:

总结

通过与streamGraph的对比我们可以发现,jobGraph是对streamGraph进行了一定优化处理后的结果,如一些operator的chaining操作,slot sharing group与coLocationGroupKey的设置,JobVertex之间连接拓朴的变化(在JobEdge后面添加了IntermediateDataSet)等。

对于从StreamGraph到JobGraph的变化,主要总结如下(来自Jark's Blog的一段介绍[1],里面对为何flink要用多层图来进行任务处理也有详细解释):

•StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。•StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。•StreamEdge:表示连接两个StreamNode的边。•JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。•JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。•IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。•JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。

References

[1] Jark's Blog的一段介绍: http://wuchong.me/blog/2016/05/03/flink-internals-overview/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 分析入口
  • LocalExecutor#getJobGraph
  • StreamingJobGraphGenerator#createJobGraph()方法
    • 1. 为每个StreamNode生成一个唯一的hash值
      • 2.设置算子链
        • 3. 设置物理边
          • 4. 设置slot sharing group和co-location
            • 5. 其他设置
            • 总结
              • References
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档