前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从头分析flink源码第六篇之ExecutionGraph的生成

从头分析flink源码第六篇之ExecutionGraph的生成

作者头像
山行AI
发布2021-09-14 12:49:25
6690
发布2021-09-14 12:49:25
举报
文章被收录于专栏:山行AI山行AI

前言

上一篇中我们梳理了jobGraph提交过程中taskmanager、jobmanager、resourcemanager各组件的启动流程,本篇我们接着上一篇中的内容来分析一下从jobGraph生成ExecutionGraph的源码执行流程。

在DefaultScheduler构造时,它的父类SchedulerBase的构造器中有如下一段代码:

代码语言:javascript
复制
    // 创建和恢复ExecutionGraph
        this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
        // 调度拓朴里有partition信息和pipelineRegion信息
        this.schedulingTopology = executionGraph.getSchedulingTopology();

本篇我们将聚焦这几行代码的执行逻辑来对ExecutionGraph的生成流程进行分析。

createAndRestoreExecutionGraph

闲话少说,直接上代码:

代码语言:javascript
复制
    private ExecutionGraph createAndRestoreExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp) throws Exception {
        ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
        // checkpoint 协调器
        final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            // check whether we find a valid checkpoint
            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
                    new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
                // check whether we can restore from a savepoint
                tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
            }
        }
        return newExecutionGraph;
    }

这段代码主要处理的逻辑有两个:1. 创建ExecutionGraph;2. 如果设置了使用savepoint启动则尝试使用savepoint进行恢复。接下来我们分别来进行分析。

createExecutionGraph方法

我们先来看下代码信息:

代码语言:javascript
复制
private ExecutionGraph createExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        ShuffleMaster<?> shuffleMaster,
        final JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp) throws JobExecutionException, JobException {
        // 任务布署监听器,用于记录布署过程中发生的动作
        ExecutionDeploymentListener executionDeploymentListener = new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
        // 执行状态更新监听器
        ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {
            if (newState.isTerminal()) {
                executionDeploymentTracker.stopTrackingDeploymentOf(execution);
            }
        };
        // 任务失败容错策略
        final FailoverStrategy.Factory failoverStrategy = legacyScheduling ?
            FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) :
            // 不做处理
            new NoOpFailoverStrategy.Factory();
        // 构造ExecutionGraph
        return ExecutionGraphBuilder.buildGraph(
            null,
            jobGraph,
            jobMasterConfiguration,
            futureExecutor,
            ioExecutor,
            slotProvider,
            userCodeLoader,
            checkpointRecoveryFactory,
            rpcTimeout,
            restartStrategy,
            currentJobManagerJobMetricGroup,
            blobWriter,
            slotRequestTimeout,
            log,
            shuffleMaster,
            partitionTracker,
            failoverStrategy,
            executionDeploymentListener,
            executionStateUpdateListener,
            initializationTimestamp);
    }

一、入参

这个方法有几个入参,我们分别来分析一下这几个入参。

1. currentJobManagerJobMetricGroup

在JobMaster的构造方法上有一个参数:

代码语言:javascript
复制
JobManagerJobMetricGroupFactory jobMetricGroupFactory

这个参数的初始化发生在org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner方法中,使用的是new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup)方法,主要用于记录jobmanager的metric信息。

2. shuffleMaster

用于协调处理任务中的shuffle操作。

shuffleMaster的初始化发生在DefaultJobManagerRunnerFactory#createJobManagerRunner方法中,初始化代码如下:

代码语言:javascript
复制
final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);

这里是使用SPI的方式加载shuffleServiceFactory,用户可以添加自定义的实现,flink默认的实现是NettyShuffleServiceFactory。最后调用的是NettyShuffleServiceFactory#createShuffleMaster方法:

代码语言:javascript
复制
    @Override
    public NettyShuffleMaster createShuffleMaster(Configuration configuration) {
        return NettyShuffleMaster.INSTANCE;
    }

可以看到,这里返回的是一个单例对象。

3. partitionTracker

它用于跟踪执行任务的资源信息(即TaskManager信息)。

它的初始化在DefaultJobMasterServiceFactory#createJobMasterService中创建JobMaster时会创建PartitionTrackerFactory,代码如下图:

在JobMaster的构造方法中会为JobMaster创建partitionTracker,具体逻辑如下:

代码语言:javascript
复制
    this.partitionTracker = checkNotNull(partitionTrackerFactory)
            .create(resourceID -> {
                Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(resourceID);
                if (taskManagerInfo == null) {
                    return Optional.empty();
                }
                return Optional.of(taskManagerInfo.f1);
            });

partitionTrackerFactory的lookup逻辑是去registeredTaskManagers根据resourceID获取到对应的TaskManager的location信息。

4. executionDeploymentTracker

executionDeploymentTracker的创建位置在DefaultJobMasterServiceFactory#createJobMasterService方法中,直接以new DefaultExecutionDeploymentTracker()的方式创建的。它用于维护并追踪任务执行部署的链路信息。

5. initializationTimestamp

initializationTimestamp为进入org.apache.flink.runtime.dispatcher.Dispatcher#runJob方法的时间戳。

接下来,我们直接来看ExecutionGraphBuilder#buildGraph方法,由于方法较长,我们采取分割式分析的方法逐步来看。

二、buildGraph方法

1. 创建ExecutionGraph对象
代码语言:javascript
复制
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();
        // jobInformation中包括序列化配置,job配置,用户jar包在blobStore中的blobKey,classpath信息等
        final JobInformation jobInformation = new JobInformation(
            jobId,jobName,
            jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),
            jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());
        // 历史上保留的最大执行尝试数
        final int maxPriorAttemptsHistoryLength =
                jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
        // intermediate result partitions释放策略工厂,决定intermediate result partitions何时释放
        final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
        PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
        // create a new execution graph, if none exists so far
        final ExecutionGraph executionGraph;
        try {
            // 创建executionGraph
            executionGraph = (prior != null) ? prior :
                new ExecutionGraph(
                    jobInformation,futureExecutor,ioExecutor,rpcTimeout,restartStrategy,
                    maxPriorAttemptsHistoryLength,failoverStrategyFactory,slotProvider,classLoader,
                    blobWriter,allocationTimeout,partitionReleaseStrategyFactory,shuffleMaster,
                    partitionTracker,jobGraph.getScheduleMode(),executionDeploymentListener,
                    executionStateUpdateListener,initializationTimestamp);
        } catch (IOException e) {
            throw new JobException("Could not create the ExecutionGraph.", e);
        }

ExecutionGraph由很多属性和方法组成,如JobInformation、RestartStrategy、ClassLoader、PartitionReleaseStrategy、DefaultExecutionTopology、SlotProviderStrategy、JobMasterPartitionTracker、ResultPartitionAvailabilityChecker、CheckpointCoordinator、ShuffleMaster等,也维护着需要执行的所有任务信息,部分罗列如下:

代码语言:javascript
复制
    /** All job vertices that are part of this graph. */
    private final Map<JobVertexID, ExecutionJobVertex> tasks;

    /** All vertices, in the order in which they were created. **/
    private final List<ExecutionJobVertex> verticesInCreationOrder;

    /** All intermediate results that are part of this graph. */
    private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults;

    /** The currently executed tasks, for callbacks. */
    private final Map<ExecutionAttemptID, Execution> currentExecutions;

在ExecutionGraph的构造方法中对这些基础信息进行了初始化操作,在后续的过程中会用jobGraph中的信息来对这些属性进行填充。

2. 向ExecutionGraph填充jsonPlan

直接上代码:

代码语言:javascript
复制
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

这里会直接使用jobGraph来生成json格式的执行计划。

3. 遍历JobVertex初始化需要在master上的执行逻辑

直接上代码:

代码语言:javascript
复制
    for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId,
                        "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
            }
            try {
                vertex.initializeOnMaster(classLoader);
            }

vertex.initializeOnMaster方法是一个预留的模板方法,用户可以自己嵌一些需要在jobmaster上执行的逻辑。

4. 生成ExecutionGraph的topology
代码语言:javascript
复制
// topologically sort the job vertices and attach the graph to the existing one
// jobGraph的taskVertices为map结构,里面都是无序的,这个方法的作用是将这些无序的转成有序列表
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
    log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
// 创建ExecutionJobVertex和IntermediateResult
executionGraph.attachJobGraph(sortedTopology);

这里会使用jobGraph来生成ExecutionGraph里的拓朴结构,我们直接来看attachJobGraph方法:

代码语言:javascript
复制
    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
        assertRunningInJobMasterMainThread();
        LOG.debug(
            "Attaching {} topologically sorted vertices to existing job graph with {} " +
                "vertices and {} intermediate results.",
            topologiallySorted.size(),
            tasks.size(),
            intermediateResults.size());
        final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
        final long createTimestamp = System.currentTimeMillis();
        for (JobVertex jobVertex : topologiallySorted) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            // 将jobGraph的顶点转化成ExecutionJobVertex
            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(
                this,
                jobVertex,
                1,
                maxPriorAttemptsHistoryLength,
                rpcTimeout,
                globalModVersion,
                createTimestamp);
            // 连接ExecutionJobVertex与intermediateResult之间,all_to_all point wise  创建ExecuteEdge信息并与上下游节点建立连接
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format(
                    "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(),
                    ejv,
                    previousTask));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(
                    res.getId(),
                    res);
                if (previousDataSet != null) {
                    throw new JobException(String.format(
                        "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(),
                        res,
                        previousDataSet));
                }
            }
            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }
        // 创建执行拓扑,并指定上下游的生产和消费partition
        // the topology assigning should happen before notifying new vertices to failoverStrategy
        executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
        failoverStrategy.notifyNewVertices(newExecJobVertices);
        partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
            getSchedulingTopology());
    }

这里会通过遍历排好序的JobVertex列表生成ExecutionJobVertex列表和executionTopology信息。这里我们分两部分分别来看。

由JobVertex生成ExecutionJobVertex

这部分的代码如下:

代码语言:javascript
复制
// 将jobGraph的顶点转化成ExecutionJobVertex
            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(
                this,
                jobVertex,
                1,
                maxPriorAttemptsHistoryLength,
                rpcTimeout,
                globalModVersion,
                createTimestamp);
            // 连接ExecutionJobVertex与intermediateResult之间,all_to_all point wise  创建ExecuteEdge信息并与上下游节点建立连接
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format(---------));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(
                    res.getId(),
                    res);
                if (previousDataSet != null) {
                    throw new JobException(---)
                }
            }
            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);

从上面的代码中可以看出,JobVertex与ExecutionJobVertex是一对一的关系,和JobVertex中的IntermediateDataSet列表一样,在ExecutionJobVertex内部也维护着一个IntermediateResult列表,IntermediateResult列表的数量和JobVertex中的IntermediateDataSet列表的数量是一样的。

ExecutionJobVertex

接下来我们进入到ExecutionJobVertex的构造方法,由于方法较长,我们分段来看:

1.subTask的数量是由什么决定的?

代码语言:javascript
复制
   // 获取jobVertex的并行度,如果没有设置则会使用默认的并行度
           int vertexParallelism = jobVertex.getParallelism();
           // 确定subTask的数量
           int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
           // 最大并行度
           final int configuredMaxParallelism = jobVertex.getMaxParallelism();

           this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);
           // 如果jobVertex没有配置最大并行度,则会计算一个默认值
           // if no max parallelism was configured by the user, we calculate and set a default
           setMaxParallelismInternal(maxParallelismConfigured ?
                   configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

从上面的代码中可以看到numTaskVertices即subTask的数量是由JobVertex的并行度决定的,关于并行度的取值在上面的代码中也介绍得比较清晰了,这里就不过多去分析了。

2.IntermediateResult列表的来源是哪里?

代码语言:javascript
复制
     // 当前节点产生的中间结果,可能它的下游对应多个分区(一对多)
           // create the intermediate results
           this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
           for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
               final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
               // 当前节点产生中间数据的列表
               this.producedDataSets[i] = new IntermediateResult(
                       result.getId(),
                       this,
                       numTaskVertices,
                       result.getResultType());
           }

从代码中可以看出,IntermediateResult与JobVertex中的IntermediateDataSet是一一对应的关系,即JobVertex中的producedDataSets列表中的一个IntermediateDataSet会在ExecutionJobVertex中对应producedDataSets列表中的一个IntermediateResult。而IntermediateResult会和subTask中的partition是一对多的关系,这种对应关系是由连接方式是all_to_all还是point_wise决定的。

3.ExecutionJobVertex与ExecutionVertex的对应关系是怎么样的?

代码语言:javascript
复制
      // create all task vertices
              for (int i = 0; i < numTaskVertices; i++) {
                  // 根据并行度创建多个ExecutionVertex,每个ExecutionVertex对应一个subTask。多个subTask共用producedDataSets
                  ExecutionVertex vertex = new ExecutionVertex(
                          this,
                          i,
                          producedDataSets,
                          timeout,
                          initialGlobalModVersion,
                          createTimestamp,
                          maxPriorAttemptsHistoryLength);
                  this.taskVertices[i] = vertex;
              }

单纯从这段代码上可以看出ExecutionJobVertex与ExecutionVertex是一对多的关系,每个ExecutionJobVertex中ExecutionVertex的个数是由上面讲过的subTask的数量决定的。

ExecutionVertex

我们接着来看ExecutionVertex的构造方法,我们仍然分段来看下具体的实现逻辑。

1.IntermediateResultPartition

代码语言:javascript
复制
         this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
                 for (IntermediateResult result : producedDataSets) {
                     // 中间结果分区,每个IntermediateResult对应一个IntermediateResultPartition,然后每个ExecutionVertex对象对应不同的subTaskIndex,
                     // 每个subTaskIndex都会与producedDataSets中所有的IntermediateResult对应的一个IntermediateResultPartition形成映射
                     IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
                     // 在IntermediateResult中有一个partitions数组
                     result.setPartition(subTaskIndex, irp);
                     resultPartitions.put(irp.getPartitionId(), irp);
                 }

在这里的逻辑比较清晰了,首先producedDataSets在上面有分析过,它是每个ExectuionJobVertex中维护的一个IntermediateResult列表。然后在每一个ExecutionVertex构造方法的内部会遍历producedDataSets,对其中的每一个IntermediateResult都生成一个IntermediateResultPartition,并按照subTaskIndex的顺序放入到IntermediateResult的IntermediateResultPartition数组中。并将IntermediateResultPartition按照partitionId与IntermediateResultPartition映射形式放到resultPartitions中去。

2. ExecutionVertex与ExecutionEdge的关系

代码语言:javascript
复制
      this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];

ExecutionVertex中维护着一个inputEdges数组,即上一个ExecutionJobVertex中ExecutionVertex中的IntermediateResultPartition的输出边。

我们来看下IntermediateResultPartition的结构:

可以看到,它里面维护着生产数据的producer为ExecutionVertex类型的,一个consumers列表为这个分区的输出边,同时也是下游的ExecutionVertex的输入边。

我们再回过头来看ExecutionGraph的attachJobGraph方法中的一段代码:

图中标红的代码发生在ExecutionJobVertex构造完成后的动作,会先执行1处的逻辑。由于上面是对有序的JobVertex列表进行的遍历,也就是说先创建的是Source节点的ExecutionJobVertex,进行到1中会因为没有前继节点而直接跳过,进入到2处会把Source节点放到intermediateResults中,依次类推。

ExecutionJobVertex#connectToPredecessors方法中的逻辑这里就不再进行分析了,主要是遍历当前jobVertex的所有输入边,然后按照JobVertex的并行度,将当前ExecutionJobVertex中的ExecutionVertex与输入的ExecutionEdge建立连接,这些ExecutionEdge列表作为下游ExecutionVertex的inputEdges,同时作为上游ExecutionVertex的IntermediateResultPartition的consumers。这里需要注意的是输入的JobEdge的DistributionPattern决定了IntermediateResultPartition与ExecutionEdge之间的连接方式。

Execution

Execution是最终用于运行subTask时使用的可执行的实例,它在ExecutionVertex的构造方法内创建,我们来看下代码:

代码语言:javascript
复制
// 创建execution,一个顶点的一次执行。虽然{@link ExecutionVertex}可以执行多次(用于恢复、重新计算、重新配置),但该类跟踪顶点和资源的单个执行状态。
        this.currentExecution = new Execution(
            getExecutionGraph().getFutureExecutor(),
            this,
            0,
            initialGlobalModVersion,
            createTimestamp,
            timeout);

这是创建Execution的入口,我们来看Execution的构造方法:

可以看到这其中有executor信息、关联的ExecutionVertex信息、分区信息、申请的资源信息等,都是subTask执行时需要用到的。在Execution申请到资源后会进行分区信息的注册和资源信息的填入。

DefaultExecutionTopology的生成

生成逻辑在ExecutionGraph#attachJobGraph方法中,直接上代码:

代码语言:javascript
复制
// 创建执行拓扑,并指定上下游的生产和消费partition
// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);

DefaultExecutionTopology#formExecutionGraph方法的代码逻辑如下:

代码语言:javascript
复制
    public static DefaultExecutionTopology fromExecutionGraph(ExecutionGraph executionGraph) {
        // 默认的执行拓朴
        final DefaultExecutionTopology topology = new DefaultExecutionTopology(executionGraph);
        // 生成流水线regions
        final List<DefaultSchedulingPipelinedRegion> pipelinedRegions = generatePipelinedRegions(topology);
        topology.setPipelinedRegions(pipelinedRegions);
        ensureCoLocatedVerticesInSameRegion(pipelinedRegions, executionGraph);
        return topology;
    }

这里会根据executionGraph生成DefaultExecutionTopology,然后对topology进行pipeline region的划分。

细心的可以发现PipelinedRegionComputeUtil#computePipelinedRegions方法在jobGraph生成时也有过调用,内部的连通器算法主要用于对图的连通进行合理的规划。

DefaultExecutionTopology为一个用于调度执行的拓朴,内部维护着执行时需要的任务拓朴信息。具体内部的细节后续再专门进行分析,到这里ExecutionGraph的部分我们就先结束了。

总结

对于ExecutionGraph的组成还是相当复杂的,本文只能从整体流程上进行了一些分析,当然也没办法做到面面俱到。后续再专门针对本文中忽略的一些细节进行分析。

WordCount程序从JobGraph到ExecutionGraph的转变过程我们以下面一张图来进行最后的总结说明:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • createAndRestoreExecutionGraph
    • createExecutionGraph方法
      • 一、入参
        • 1. currentJobManagerJobMetricGroup
        • 2. shuffleMaster
        • 3. partitionTracker
        • 4. executionDeploymentTracker
        • 5. initializationTimestamp
      • 二、buildGraph方法
        • 1. 创建ExecutionGraph对象
        • 2. 向ExecutionGraph填充jsonPlan
        • 3. 遍历JobVertex初始化需要在master上的执行逻辑
        • 4. 生成ExecutionGraph的topology
        • 由JobVertex生成ExecutionJobVertex
        • ExecutionJobVertex
        • ExecutionVertex
        • Execution
        • DefaultExecutionTopology的生成
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档