前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从头分析flink源码第五篇之提交jobGraph时各组件内部都发生了什么?

从头分析flink源码第五篇之提交jobGraph时各组件内部都发生了什么?

作者头像
山行AI
发布2021-09-14 12:33:40
1.1K0
发布2021-09-14 12:33:40
举报
文章被收录于专栏:山行AI山行AI

上几篇文章中我们分析了一个flink wordcount任务生成streamGraph和jobGraph的过程。接下来,我们继续从jobGraph生成后开始来分析executionGraph的生成过程及任务的提交过程,本文主要分析任务提交过程中各组件的执行逻辑,如TaskManager、ResourceManager、JobManager等。本文只涉及到本地运行wordcount时各组件的内部运行逻辑分析,不包括其他资源管理模式如yarn或Kubernetes模式下任务的提交流程(后续会专门行文来分析)。文章较长,代码较多,不喜慎入。

接前几篇文章,分析到了org.apache.flink.client.deployment.executors.LocalExecutor#execute方法:

代码语言:javascript
复制
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
    checkNotNull(pipeline);
    checkNotNull(configuration);
    // 有效的配置
    Configuration effectiveConfig = new Configuration();
    effectiveConfig.addAll(this.configuration);
    effectiveConfig.addAll(configuration);

    // we only support attached execution with the local executor.
    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
    // 获取了jobGraph
    final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);

    return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

在上面的方法中一方面对配置进行了有效性校验,另一方面生成了jobGraph,然后提交jobGraph。我们来看org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob方法:

代码语言:javascript
复制
    public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
        MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
        MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
        miniCluster.start();
        return miniCluster
            .submitJob(jobGraph)
            .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
                org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
                    () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
                    () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
                    userCodeClassloader);
                return submissionResult;
            }))
            .thenApply(result -> new MiniClusterJobClient(
                    result.getJobID(),
                    miniCluster,
                    userCodeClassloader,
                    MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER))
            .whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    // We failed to create the JobClient and must shutdown to ensure cleanup.
                    shutDownCluster(miniCluster);
                }
            })
            .thenApply(Function.identity());
    }

这个方法主要执行内容有以下几步:

1.启动miniCluster;2.向mini cluster提交jobGraph;3.jobGraph提交成功后返回submissionResult;4.将结果包装成MiniClusterJobClient并返回;5.如果有异常产生则关闭mini cluster;6.如果5没有异常产生则反之返回4产生的结果。

从上面我们也能看出CompletableFuture流式编程的魅力,在这里我们主要关注下taskmanager、jobmanager、resourcemanager这些核心组件的执行动作。在MiniCluster#start方法内部会处理taskmanager和resourcemanager的逻辑,而jobmanager的逻辑主要在提交任务的流程里。

taskManager

MiniCluster#start方法内部会调用startTaskManagers方法,关于这个方法之前有写过一篇文章进行分析,可以看下之前写过的一篇文章flink taskmanager启动篇

这里主要来挼一下整体的流程:

代码语言:javascript
复制
startTaskManagers() => startTaskExecutor() => taskExecutor.start() => TaskExecutor#onStart =>
startTaskExecutorServices() => resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()) => StandaloneLeaderRetrievalService#start => listener.notifyLeaderAddress(leaderAddress, leaderId) =>

在resourceManager选主成功后会进行回调通知,紧接着会进入到org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress方法中,我们来看下具体的代码逻辑:

代码语言:javascript
复制
    @Override
    public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
    // 异步执行,rpc触发
    runAsync(
    () -> notifyOfNewResourceManagerLeader(
    leaderAddress,
    ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    }

继续来看org.apache.flink.runtime.taskexecutor.TaskExecutor#notifyOfNewResourceManagerLeader方法:

代码语言:javascript
复制
    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
        // 重新连接resourceManager
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
    }

在这里会去建立和resourceManager的连接,具体代码如下:

代码语言:javascript
复制
private void reconnectToResourceManager(Exception cause) {
        // 关闭之前的resourceManager的连接
        closeResourceManagerConnection(cause);
        startRegistrationTimeout();
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

private void connectToResourceManager() {
        assert(resourceManagerAddress != null);
        assert(establishedResourceManagerConnection == null);
        assert(resourceManagerConnection == null);

        log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

        final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
            getAddress(),
            getResourceID(),
            unresolvedTaskManagerLocation.getDataPort(),
            JMXService.getPort().orElse(-1),
            hardwareDescription,
            memoryConfiguration,
            taskManagerConfiguration.getDefaultSlotResourceProfile(),
            taskManagerConfiguration.getTotalResourceProfile()
        );

        resourceManagerConnection =
            new TaskExecutorToResourceManagerConnection(
                log,
                getRpcService(),
                taskManagerConfiguration.getRetryingRegistrationConfiguration(),
                resourceManagerAddress.getAddress(),
                resourceManagerAddress.getResourceManagerId(),
                getMainThreadExecutor(),
                new ResourceManagerRegistrationListener(),
                taskExecutorRegistration);
        resourceManagerConnection.start();
    }

在这里会调用resourceManagerConnection.start方法,这个方法其实是内有乾坤的,我们来看一下:

这里我们主要关注下createNewRegistration方法:

这里会在和resourceManager连接成功后回调onRegistrationSuccess方法,它在org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection中的实现为:

代码语言:javascript
复制
    @Override
    protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
        log.info("Successful registration at resource manager {} under registration id {}.",
            getTargetAddress(), success.getRegistrationId());
        registrationListener.onRegistrationSuccess(this, success);
    }

然后进入到TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess方法中:

接着进入到org.apache.flink.runtime.taskexecutor.TaskExecutor#establishResourceManagerConnection方法:

这里主要有两个工作:1. 向ResourceManager注册,上报slot信息;2. 设置与ResourceManager之间的心跳监测。

ResourceManager

在MiniCluster#start方法内部有一段代码片段:

代码语言:javascript
复制
setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);

在org.apache.flink.runtime.minicluster.MiniCluster#setupDispatcherResourceManagerComponents方法内部有一段执行逻辑为:

代码语言:javascript
复制
    dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
            configuration,
            dispatcherResourceManagreComponentRpcServiceFactory,
            haServices,
            blobServer,
            heartbeatServices,
            metricRegistry,
            metricQueryServiceRetriever,
            new ShutDownFatalErrorHandler()
        ));

而createDispatcherResourceManagerComponents方法的执行逻辑为:

在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create方法中会根据resourceManagerFactory执行resourceManager的执行逻辑并启动resourceManager。

整体方法的执行逻辑为:

代码语言:javascript
复制
resourceManager#start => ResourceManager#onStart => ResourceManager#startResourceManagerServices => leaderElectionService.start(this) => EmbeddedLeaderElectionService#start =>

我们来看下org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.EmbeddedLeaderElectionService#start方法:

代码语言:javascript
复制
    @Override
    public void start(LeaderContender contender) throws Exception {
    checkNotNull(contender);
    // 添加contender
    addContender(this, contender);
    }

    private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
        synchronized (lock) {
            checkState(!shutdown, "leader election service is shut down");
            checkState(!service.running, "leader election service is already started");

            try {
                // 如果已经存在抛出异常
                if (!allLeaderContenders.add(service)) {
                    throw new IllegalStateException("leader election service was added to this service multiple times");
                }

                service.contender = contender;
                service.running = true;

                updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        fatalError(throwable);
                    }
                });
            }
            catch (Throwable t) {
                fatalError(t);
            }
        }
    }

紧接着进入到org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService#updateLeader方法:

在org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.GrantLeadershipCall#run方法中:

紧接着进入到org.apache.flink.runtime.resourcemanager.ResourceManager#grantLeadership方法:

这个方法是用来做确定了ResourceManager的主节点后的回调方法。

JobManager

我们来看org.apache.flink.runtime.minicluster.MiniCluster#submitJob方法:

代码语言:javascript
复制
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
            .thenCombine(
                dispatcherGatewayFuture,
                (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
            .thenCompose(Function.identity());
        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }

上面这个方法的整个执行步骤为:

1.通过dispatcherGatewayRetriever去获取DispatcherGateway,返回的是dispatcherGatewayFuture;2.通过dispatcherGateway获取blobServerAddress;3.利用blobServer和jobGraph去执行jar的上传;4.利用dispatcherGatewayFuture返回的dispatcherGateway执行jobGraph的提交。

接下来进入到org.apache.flink.runtime.dispatcher.Dispatcher#submitJob方法:

代码语言:javascript
复制
@Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

        try {
            if (isDuplicateJob(jobGraph.getJobID())) {
                return FutureUtils.completedExceptionally(
                    new DuplicateJobSubmissionException(jobGraph.getJobID()));
            } else if (isPartialResourceConfigured(jobGraph)) {
                return FutureUtils.completedExceptionally(
                    new JobSubmissionException(
                        jobGraph.getJobID(),
                        "Currently jobs is not supported if parts of the vertices have " +
                            "resources configured. The limitation will be removed in future versions."));
            } else {
                return internalSubmitJob(jobGraph);
            }
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

•这里会根据jobId去判断一下这个job是不是已经被提交或执行;•紧接着进入到internalSubmitJob方法,执行真正的提交任务的逻辑。

我们来看org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob方法的代码:

代码语言:javascript
复制
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(
            jobGraph.getJobID(),
            jobGraph,
            this::persistAndRunJob)
            .thenApply(ignored -> Acknowledge.get());
        // 获取结果
        return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                cleanUpJobData(jobGraph.getJobID(), true);
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
                final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(
                    throwable);
                log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
                throw new CompletionException(
                    new JobSubmissionException(
                        jobGraph.getJobID(),
                        "Failed to submit job.",
                        strippedThrowable));
            } else {
                return acknowledge;
            }
        }, ioExecutor);
    }

在这个方法里会调用persistAndRunJob进行job的提交。注意这步操作是异步处理的,执行器为fencedMainThreadExecutor。

我们继续进入到org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob方法中:

代码语言:javascript
复制
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        runJob(jobGraph, ExecutionType.SUBMISSION);
    }

jobGraphWriter的主要作用是用于将jobGraph持久化在zk或者Kubernetes的leader节点。我们当前是在standalone模式下,不会进行这步处理。直接来看runJob方法:

代码语言:javascript
复制
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
        long initializationTimestamp = System.currentTimeMillis();
        // 创建JobManagerRunner
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(
            jobGraph,
            initializationTimestamp);
        DispatcherJob dispatcherJob = DispatcherJob.createFor(
            jobManagerRunnerFuture,
            jobGraph.getJobID(),
            jobGraph.getName(),
            initializationTimestamp);
        runningJobs.put(jobGraph.getJobID(), dispatcherJob);
---------------省略部分处理future返回结果的代码----------------
    }

这里的核心逻辑在createJobManagerRunner方法中,dispatcherJob与下面的逻辑主要用于处理jobManagerRunnerFuture的结果。我们主要来看下createJobManagerRunner方法:

代码语言:javascript
复制
    CompletableFuture<JobManagerRunner> createJobManagerRunner(
        JobGraph jobGraph,
        long initializationTimestamp) {
        final RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(
            () -> {
                try {
                    // 创建jobManagerRunner
                    JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
                        jobGraph,
                        configuration,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        jobManagerSharedServices,
                        new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                        fatalErrorHandler,
                        initializationTimestamp);
                    // 启动runner
                    runner.start();
                    return runner;
                } catch (Exception e) {
                    throw new CompletionException(new JobInitializationException(
                        jobGraph.getJobID(),
                        "Could not instantiate JobManager.",
                        e));
                }
            },
            ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
    }

我们来看下org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory#createJobManagerRunner方法内部的逻辑,这里就直接无脑贴代码了:

代码语言:javascript
复制
@Override
    public JobManagerRunner createJobManagerRunner(
            JobGraph jobGraph,
            Configuration configuration,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            JobManagerSharedServices jobManagerServices,
            JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
            FatalErrorHandler fatalErrorHandler,
            long initializationTimestamp) throws Exception {
        // 获取配置
        final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
        // 获取slotPoolFactory,处理slot的管理逻辑
        final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
        // 获取调度工厂
        final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
        // 创建处理shuffle操作的shuffleMaster
        final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
        // 创建jobMasterServiceFactory
        final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
            jobMasterConfiguration,
            slotPoolFactory,
            rpcService,
            highAvailabilityServices,
            jobManagerServices,
            heartbeatServices,
            jobManagerJobMetricGroupFactory,
            fatalErrorHandler,
            schedulerNGFactory,
            shuffleMaster);
        // 创建JobManagerRunnerImpl实例
        return new JobManagerRunnerImpl(
            jobGraph,
            jobMasterFactory,
            highAvailabilityServices,            jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
            jobManagerServices.getScheduledExecutorService(),
            fatalErrorHandler,
            initializationTimestamp);
    }

这个方法里的操作主要有:

1.获取jobmaster的配置信息,构建JobMasterConfiguration对象;2.通过配置信息创建SlotPoolFactory;3.创建处理shuffle操作的shuffleMaster;4.创建jobMasterServiceFactory;5.创建JobManagerRunnerImpl实例。

接下来就进入到JobManagerRunnerImpl的构造方法了,继续无脑贴代码:

代码语言:javascript
复制
public JobManagerRunnerImpl(
            final JobGraph jobGraph,
            final JobMasterServiceFactory jobMasterFactory,
            final HighAvailabilityServices haServices,
            final LibraryCacheManager.ClassLoaderLease classLoaderLease,
            final Executor executor,
            final FatalErrorHandler fatalErrorHandler,
            long initializationTimestamp) throws Exception {

        this.resultFuture = new CompletableFuture<>();
        this.terminationFuture = new CompletableFuture<>();
        this.leadershipOperation = CompletableFuture.completedFuture(null);

        this.jobGraph = checkNotNull(jobGraph);
        this.classLoaderLease = checkNotNull(classLoaderLease);
        this.executor = checkNotNull(executor);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

        checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

        // libraries and class loader first
        final ClassLoader userCodeLoader;
        try {
            userCodeLoader = classLoaderLease.getOrResolveClassLoader(
                jobGraph.getUserJarBlobKeys(),
                jobGraph.getClasspaths()).asClassLoader();
        } catch (IOException e) {
            throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
        }

        // high availability services next
        this.runningJobsRegistry = haServices.getRunningJobsRegistry();
        this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

        this.leaderGatewayFuture = new CompletableFuture<>();

        // now start the JobManager
        this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
    }

除了相关类加载器、执行器、选主服务的准备工作外,这里会创建jobMasterService,其核心逻辑在jobMasterFactory.createJobMasterService方法中,默认使用的jobMasterFactory是DefaultJobMasterServiceFactory,在org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory#createJobMasterService方法内部会创建一个JobMaster对象。在具体来看JobMaster的内部创建逻辑之前,我们先来看下JobManagerRunnerImpl#start方法:

代码语言:javascript
复制
    @Override
    public void start() throws Exception {
        try {
            // 这里传入的contender是JobManagerRunnerImpl类型的,会在选举完成时进行contender的回调逻辑
            leaderElectionService.start(this);
        } catch (Exception e) {
            log.error("Could not start the JobManager because the leader election service did not start.", e);
            throw new Exception("Could not start the leader election service.", e);
        }
    }

我们的场景中使用的leaderElectionService是StandaloneLeaderElectionService,StandaloneLeaderElectionService#start方法代码如下:

代码语言:javascript
复制
    @Override
    public void start(LeaderContender newContender) throws Exception {
        if (contender != null) {
            // Service was already started
            throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
        }
        contender = Preconditions.checkNotNull(newContender);
        // directly grant leadership to the given contender
        contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }

在我们的场景中传入的LeaderContender对象为JobManagerRunnerImpl实例,根据代码的执行流程会执行到JobManagerRunnerImpl的grantLeadership方法,也是确定了jobManager主节点之后的回调方法,代码如下:

代码语言:javascript
复制
    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
                return;
            }
            leadershipOperation = leadershipOperation.thenCompose(
                (ignored) -> {
                    synchronized (lock) {
                        return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                    }
                });
            handleException(leadershipOperation, "Could not start the job manager.");
        }
    }

那么紧接着我们来看下verifyJobSchedulingStatusAndStartJobManager方法:

代码语言:javascript
复制
    private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
        final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

        return jobSchedulingStatusFuture.thenCompose(
            jobSchedulingStatus -> {
                if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                    return jobAlreadyDone();
                } else {
                    // 启动jobMaster
                    return startJobMaster(leaderSessionId);
                }
            });
    }

可以看到在该方法内部会调用startJobMaster方法来启动JobMaster,具体流程来看下代码:

代码语言:javascript
复制
    private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
        log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
            jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
        try {
            runningJobsRegistry.setJobRunning(jobGraph.getJobID());
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(
                new FlinkException(
                    String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
                    e));
        }

        final CompletableFuture<Acknowledge> startFuture;
        try {
            startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
        }

        final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
        return startFuture.thenAcceptAsync(
            (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
                leaderSessionId,
                jobMasterService.getAddress(),
                currentLeaderGatewayFuture),
            executor);
    }

上面这些代码里,我们主要关注下org.apache.flink.runtime.jobmaster.JobMaster#start方法:

代码语言:javascript
复制
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
        // make sure we receive RPC and async calls
        start();

        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }

这里会回调它的onStart()方法,具体为什么会回调onStart方法,可以看下之前写过的一篇文章flink taskmanager启动篇。JobMaster的onStart方法使用的是RpcEndpoint中的空实现。我们主要来看下org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution方法:

代码语言:javascript
复制
    //-- job starting and stopping  -----------------------------------------------------------------
    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        validateRunsInMainThread();
        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
            return Acknowledge.get();
        }
        setNewFencingToken(newJobMasterId);
        startJobMasterServices();
        log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
        resetAndStartScheduler();
        return Acknowledge.get();
    }

这里我们主要关注两个方法startJobMasterServices与resetAndStartScheduler。

先来看startJobMasterServices方法:
代码语言:javascript
复制
private void startJobMasterServices() throws Exception {
        startHeartbeatServices();
        // 启动slotPool
        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

        //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
        // try to reconnect to previously known leader
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        // 当资源管理器有了新的主节点时会回调监听器来通知
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

这里会做一些初始化操作:

•启动心跳服务,维持与taskmanager和resourcemanager的心跳:

•对slotPool增加一些连接池的定时检测调度手段,如调度检查slot是否空闲,批量检查slot是否超时并根据实际日志级别输出调度日志等,slotPool.start方法:

•reconnectToResourceManager方法,如果之前已经知道了resourceManager的leader地址,这里会去连接resourceManager,在连接成功后slotpool可以进行申请slot的操作。•resourceManagerLeaderRetriever.start方法,来看下它的代码:

代码语言:javascript
复制
      @Override
      public void start(LeaderRetrievalListener listener) {
          checkNotNull(listener, "Listener must not be null.");
          synchronized (startStopLock) {
              checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
              started = true;
              // directly notify the listener, because we already know the leading JobManager's address
              // 在TaskExecutor启动时这里会直接触发org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener的notifyLeaderAddress方法,因为这时候已经知道了jobManager的地址
              // 在JobMaster启动时,这里会触发org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener.notifyLeaderAddress
              listener.notifyLeaderAddress(leaderAddress, leaderId);
          }
      }

我们进入到org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener#notifyLeaderAddress方法内部:

代码语言:javascript
复制
  @Override
          public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
              runAsync(
                  () -> notifyOfNewResourceManagerLeader(
                      leaderAddress,
                      ResourceManagerId.fromUuidOrNull(leaderSessionID)));
          }

再进入到org.apache.flink.runtime.jobmaster.JobMaster#notifyOfNewResourceManagerLeader方法内部:

代码语言:javascript
复制
      private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
          resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);

          reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
      }

可以看到,这里会去连接新的resourceManager leader,然后进行相应的操作,我们继续来看它会进行哪些操作:

代码语言:javascript
复制
      private void reconnectToResourceManager(Exception cause) {
          closeResourceManagerConnection(cause);
          tryConnectToResourceManager();
      }

      private void tryConnectToResourceManager() {
          if (resourceManagerAddress != null) {
              connectToResourceManager();
          }
      }

  private void connectToResourceManager() {
          assert(resourceManagerAddress != null);
          assert(resourceManagerConnection == null);
          assert(establishedResourceManagerConnection == null);
          log.info("Connecting to ResourceManager {}", resourceManagerAddress);
          resourceManagerConnection = new ResourceManagerConnection(
              log,
              jobGraph.getJobID(),
              resourceId,
              getAddress(),
              getFencingToken(),
              resourceManagerAddress.getAddress(),
              resourceManagerAddress.getResourceManagerId(),
              scheduledExecutorService);
          // 在与resourceManager连接失败时会进行重试
          resourceManagerConnection.start();
      }

我们接着来看resourceManagerConnection的start方法:

这里主要有两步操作,创建新的Registration和启动这个registration,我们分别来看一下:

•createNewRegistration方法:

在RetryingRegistration内部维护着一个CompletableFuture,在future内部有一个注册成功的执行逻辑,即回调onRegistrationSuccess方法。

我们来看下org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerConnection#onRegistrationSuccess方法中的执行逻辑:

代码语言:javascript
复制
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    runAsync(() -> {
        // filter out outdated connections
        //noinspection ObjectEquality
        if (this == resourceManagerConnection) {
            establishResourceManagerConnection(success);
        }
    });
}

继续来分析org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection方法,这里先无脑贴代码:

代码语言:javascript
复制
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
        final ResourceManagerId resourceManagerId = success.getResourceManagerId();
        // verify the response with current connection
        if (resourceManagerConnection != null
                && Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
            log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
            final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
            final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
            establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
                resourceManagerGateway,
                resourceManagerResourceId);
            slotPool.connectToResourceManager(resourceManagerGateway);
            resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    // request heartbeat will never be called on the job manager side
                }
            });
        } else {
            log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId);

        }
    }

这里主要有两个操作:1. 建立slotPool与resourceManager的连接;2. 建立与resourceManager的心跳监测机制。我们主要来看下前者,直接来看org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#connectToResourceManager方法:

代码语言:javascript
复制
    @Override
    public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
        // work on all slots waiting for this connection
        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        // all sent off
        waitingForResourceManager.clear();
    }

该方法主要用于获取waitingForResourceManager中的pendingRequest,然后向resourceManager申请slot。

•newRegistration.startRegistration()方法,是具体的与resourceManager建立连接的方法,不是本文重点,这里就不过多去分析了。

再来看JobMaster#resetAndStartScheduler方法

在分析JobMaster#resetAndStartScheduler方法之前,我们先来看一下JobMaster的构造方法。由于JobMaster的构造方法比较长,这里我们主要关注下schedulerNG的实例化过程,代码片段如下:

代码语言:javascript
复制
// 创建DefaultScheduler
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);

其中JobMaster#createScheduler方法代码如下:

在这里是创建SchedulerNG对象的逻辑,这里我们主要关注下SchedulerBase#SchedulerBase构造方法中关于executionGraph的生成逻辑,部分代码片段如下:

代码语言:javascript
复制
    // 创建和恢复ExecutionGraph
        this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
        // 调度拓朴里有partition信息和pipelineRegion信息
        this.schedulingTopology = executionGraph.getSchedulingTopology();

这里是从jobGraph生成executionGraph的逻辑,具体内容后面我们专门用一篇文章来进行分析。

这里我们继续来看JobMaster#resetAndStartScheduler方法,代码如下:

如果schedulerNG的requestJobStatus为JobStatus.CREATED,则指定schedulerNG的mainThreadExecutor为当前的mainThreadExecutor;否则,创建新的SchedulerNG实例并在创建完成后设置mainThreadExecutor;在schedulerAssignedFuture中的逻辑处理完成后,执行startScheduling方法,逻辑如下:

代码语言:javascript
复制
private void startScheduling() {
    checkState(jobStatusListener == null);
    // register self as job status change listener
    jobStatusListener = new JobManagerJobStatusListener();
    schedulerNG.registerJobStatusListener(jobStatusListener);
    schedulerNG.startScheduling();
}

@Override
public final void startScheduling() {
    mainThreadExecutor.assertRunningInMainThread();
    registerJobMetrics();
    startAllOperatorCoordinators();
    startSchedulingInternal();
}

@Override
protected void startSchedulingInternal() {
    log.info(
        "Starting scheduling with scheduling strategy [{}]",
        schedulingStrategy.getClass().getName());
    prepareExecutionGraphForNgScheduling();
    schedulingStrategy.startScheduling();
}

在startScheduling方法内部是真正执行调度的部分,以我们这里使用到PipelinedRegionSchedulingStrategy#startScheduling方法为例:

代码语言:javascript
复制
@Override
public void startScheduling() {
    final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
        .toStream(schedulingTopology.getAllPipelinedRegions())
        .filter(region -> !region.getConsumedResults().iterator().hasNext())
        .collect(Collectors.toSet());
    maybeScheduleRegions(sourceRegions);
}

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
        final List<SchedulingPipelinedRegion> regionsSorted =
            SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
        for (SchedulingPipelinedRegion region : regionsSorted) {
            // 处理region
            maybeScheduleRegion(region);
        }
    }

    private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
        if (!areRegionInputsAllConsumable(region)) {
            return;
        }
        checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");
        final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
            SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                regionVerticesSorted.get(region),
                id -> deploymentOption);
        // 申请slot并进行deploy
        schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

在内部会以region为单位进行各subTask的调度布署,篇幅问题这里就不再过多分析任务提交布署的部分了,后面用专门的文章来分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • taskManager
  • ResourceManager
  • JobManager
    • 先来看startJobMasterServices方法:
      • 再来看JobMaster#resetAndStartScheduler方法
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档