前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink的local模式启动全流程源码分析

flink的local模式启动全流程源码分析

作者头像
山行AI
发布2020-05-22 16:06:39
2K0
发布2020-05-22 16:06:39
举报
文章被收录于专栏:山行AI

主要针对一个WordCount的示例来对flink中关键组件如taskManager和jobMaster等服务的启动流程进行一个简要的分析。

示例代码
代码语言:javascript
复制
object WordCount {
 


 
 def main(args: Array[String]) {
 


 
    val params: ParameterTool = ParameterTool.fromArgs(args)
 


 
 // set up execution environment
 
    val env = ExecutionEnvironment.getExecutionEnvironment
 


 
 // make parameters available in the web interface
 
    env.getConfig.setGlobalJobParameters(params)
 
    val text =
 
 if (params.has("input")) {
 
        env.readTextFile(params.get("input"))
 
 } else {
 
        println("Executing WordCount example with default input data set.")
 
        println("Use --input to specify file input.")
 
        env.fromCollection(WordCountData.WORDS)
 
 }
 
//    text.flatMap(_.toLowerCase.split("\\W+").filter(p => p.nonEmpty)).map(a => (a,1)).groupBy(0).sum(1)
 
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
 
 .map { (_, 1) }
 
 .groupBy(0)
 
 .sum(1)
 


 
 if (params.has("output")) {
 
      counts.writeAsCsv(params.get("output"), "\n", " ")
 
      env.execute("Scala WordCount Example")
 
 } else {
 
      println("Printing result to stdout. Use --output to specify output path.")
 
      counts.print()
 
 }
 


 
 }
 
}
 

这是一个执行WordCount的操作,我们以这个demo为入口来对整个执行流程进行分析记录。

生成JobGraph流程

这里就先流水式地记录一下,之后的文章中再详细地进行分析。

org.apache.flink.api.scala.DataSet#print:
代码语言:javascript
复制
 def print(): Unit = {
 
    javaSet.print()
 
 }
org.apache.flink.api.java.DataSet#print()
代码语言:javascript
复制
public void print() throws Exception {
 
 List<T> elements = collect();
 
 for (T e: elements) {
 
 System.out.println(e);
 
 }
 
 }
org.apache.flink.api.java.DataSet#collect
代码语言:javascript
复制
public List<T> collect() throws Exception {
 
 final String id = new AbstractID().toString();
 
 final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
 


 
 this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
 
 JobExecutionResult res = getExecutionEnvironment().execute();
 


 
 ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
 
 if (accResult != null) {
 

执行和结果归集的过程。

org.apache.flink.api.java.ExecutionEnvironment#execute()
代码语言:javascript
复制
public JobExecutionResult execute() throws Exception {
 
 return execute(getDefaultName());
 
 }
org.apache.flink.api.java.ExecutionEnvironment#execute(java.lang.String)
代码语言:javascript
复制
 public JobExecutionResult execute(String jobName) throws Exception {
 
 final JobClient jobClient = executeAsync(jobName);
 


 
 try {
 
 if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
 
                lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
 
 } else {
 
                lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
 
 }
 
 -----------------------------------------------------
 

job执行的方法。

org.apache.flink.api.java.ExecutionEnvironment#executeAsync(java.lang.String)
代码语言:javascript
复制
 @PublicEvolving
 
 public JobClient executeAsync(String jobName) throws Exception {
 
        checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
 
 // StreamGraph与Plan都是Pipeline类型的
 
 // 生成执行计划
 
 final Plan plan = createProgramPlan(jobName);
 
 // pipeline执行工厂,
 
 final PipelineExecutorFactory executorFactory =
 
            executorServiceLoader.getExecutorFactory(configuration);
 


 
        checkNotNull(
 
            executorFactory,
 
 "Cannot find compatible factory for specified execution.target (=%s)",
 
            configuration.get(DeploymentOptions.TARGET));
 


 
 CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
 
 .getExecutor(configuration)
 
 .execute(plan, configuration);
 


 
 try {
 
 JobClient jobClient = jobClientFuture.get();
 
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
 
 return jobClient;
 
 --------------------
 

Pipeline有两个实现类:Plan和StreamGraph。当然在这个示例里生成的是Plan,在流式的情况下对应的就是StreamGraph类型。

继续往下看job的执行。

org.apache.flink.client.deployment.executors.LocalExecutor#execute
代码语言:javascript
复制
@Override
 
 public CompletableFuture<? extends JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
 
        checkNotNull(pipeline);
 
        checkNotNull(configuration);
 


 
 Configuration effectiveConfig = new Configuration();
 
        effectiveConfig.addAll(this.configuration);
 
        effectiveConfig.addAll(configuration);
 


 
 // we only support attached execution with the local executor.
 
        checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
 


 
 final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
 


 
 return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
 
 }
 

这里的pipeline会对应Plan或者StreamGraph,在getJobGraph方法中会将对应的Pipeline转换成JobGraph,然后通过submitJob方法提交。

org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob
代码语言:javascript
复制
/**
 
     * Starts a {@link MiniCluster} and submits a job.
 
     */
 
 public CompletableFuture<? extends JobClient> submitJob(JobGraph jobGraph) throws Exception {
 
 MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
 
 MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
 
        miniCluster.start();
 


 
 return miniCluster
 
 .submitJob(jobGraph)
 
 .thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
 
 .whenComplete((ignored, throwable) -> {
 
 if (throwable != null) {
 
 // We failed to create the JobClient and must shutdown to ensure cleanup.
 
                    shutDownCluster(miniCluster);
 
 }
 
 });
 
 }
 

在提交job时会创建一个MiniCluster,然后在miniCluster.start()中执行一些启动相关的操作。

org.apache.flink.runtime.minicluster.MiniCluster#start:

代码语言:javascript
复制
public void start() throws Exception {
 
 synchronized (lock) {
 
 ----------------------
 
        miniClusterConfiguration.getConfiguration();
 
 final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
 


 
 try {
 
                initializeIOFormatClasses(configuration);
 


 
                LOG.info("Starting Metrics Registry");
 
                metricRegistry = createMetricRegistry(configuration);
 


 
 // bring up all the RPC services
 
                LOG.info("Starting RPC Service(s)");
 


 
 final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;
 


 
 if (useSingleRpcService) {
 
 // 如果是single模式
 
 // we always need the 'commonRpcService' for auxiliary calls
 
                    commonRpcService = createLocalRpcService(configuration);
 
 final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
 
                    taskManagerRpcServiceFactory = commonRpcServiceFactory;
 
                    dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
 
 } else {
 
 // 其他模式
 
 // start a new service per component, possibly with custom bind addresses
 
 --------------------------
 
 // we always need the 'commonRpcService' for auxiliary calls
 
 // bind to the JobManager address with port 0
 
                    commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0);
 
 }
 


 
 //启动metric actor service
 
 RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(
 
                    configuration,
 
                    commonRpcService.getAddress());
 
                metricRegistry.startQueryService(metricQueryServiceRpcService, null);
 


 
                processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
 
                    metricRegistry,
 
 RpcUtils.getHostname(commonRpcService),
 
 ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
 // 创建 io executor 这里使用的是fixed threadPool
 
                ioExecutor = Executors.newFixedThreadPool(
 
 Hardware.getNumberCPUCores(),
 
 new ExecutorThreadFactory("mini-cluster-io"));
 
                haServices = createHighAvailabilityServices(configuration, ioExecutor);
 
 // 创建blobServer并启动
 
                blobServer = new BlobServer(configuration, haServices.createBlobStore());
 
                blobServer.start();
 
 // 启动心跳service
 
                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 


 
                blobCacheService = new BlobCacheService(
 
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
 
 );
 
 // 启动taskManager
 
                startTaskManagers();
 


 
 MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());
 


 
                setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagreComponentRpcServiceFactory, metricQueryServiceRetriever);
 


 
                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
 
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
 
                clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();
 
 // dispatcher网关的retriever
 
                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 
                    commonRpcService,
 
 DispatcherGateway.class,
 
 DispatcherId::fromUuid,
 
 20,
 
 Time.milliseconds(20L));
 
 // 资源管理器网关retriever
 
                resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 
                    commonRpcService,
 
 ResourceManagerGateway.class,
 
 ResourceManagerId::fromUuid,
 
 20,
 
 Time.milliseconds(20L));
 
 // web monitor
 
                webMonitorLeaderRetriever = new LeaderRetriever();
 
 // 启动几个component
 
                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
 
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
 
                clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
 
 ----------------------------
 

这里会调用commonRpcService = createLocalRpcService(configuration)方法创建本地的RpcService,在这里会调用AkkaRpcServiceUtils中的remoteServiceBuilder方法构建本地的actor servcie:

代码语言:javascript
复制
protected RpcService createRemoteRpcService(
 
 Configuration configuration,
 
 String bindAddress,
 
 int bindPort) throws Exception {
 
 return AkkaRpcServiceUtils.remoteServiceBuilder(configuration, bindAddress, String.valueOf(bindPort))
 
 .withBindAddress(bindAddress)
 
 .withBindPort(bindPort)
 
 .withCustomConfig(AkkaUtils.testDispatcherConfig())
 
 .createAndStart();
 
 }
 

org.apache.flink.runtime.minicluster.MiniCluster#startTaskManagers:

代码语言:javascript
复制
 @GuardedBy("lock")
 
 private void startTaskManagers() throws Exception {
 
 final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
 


 
        LOG.info("Starting {} TaskManger(s)", numTaskManagers);
 


 
 for (int i = 0; i < numTaskManagers; i++) {
 
            startTaskExecutor();
 
 }
 
 }
 

org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor:

代码语言:javascript
复制
@VisibleForTesting
 
 void startTaskExecutor() throws Exception {
 
 synchronized (lock) {
 
 final Configuration configuration = miniClusterConfiguration.getConfiguration();
 


 
 final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
 
                configuration,
 
 new ResourceID(UUID.randomUUID().toString()),
 
                taskManagerRpcServiceFactory.createRpcService(),
 
                haServices,
 
                heartbeatServices,
 
                metricRegistry,
 
                blobCacheService,
 
                useLocalCommunication(),
 
                taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
 


 
            taskExecutor.start();
 
            taskManagers.add(taskExecutor);
 
 }
 
 }

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager:

org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration:

代码语言:javascript
复制
 /**
 
     * Creates and returns the task manager services.
 
     *
 
     * @param taskManagerServicesConfiguration task manager configuration
 
     * @param taskManagerMetricGroup metric group of the task manager
 
     * @param taskIOExecutor executor for async IO operations
 
     * @return task manager components
 
     * @throws Exception
 
     */
 
 public static TaskManagerServices fromConfiguration(
 
 TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 
 MetricGroup taskManagerMetricGroup,
 
 Executor taskIOExecutor) throws Exception {
 


 
 // pre-start checks
 
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
 
 // 创建 dispatcher
 
 final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 


 
 // start the I/O manager, it will create some temp directories.
 
 final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 


 
 final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 
            taskManagerServicesConfiguration,
 
            taskEventDispatcher,
 
            taskManagerMetricGroup);
 
 final int listeningDataPort = shuffleEnvironment.start();
 
 // 创建并启动KvStateService
 
 final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
 
        kvStateService.start();
 


 
 final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
 
            taskManagerServicesConfiguration.getResourceID(),
 
            taskManagerServicesConfiguration.getExternalAddress(),
 
 // we expose the task manager location with the listening port
 
 // iff the external data port is not explicitly defined
 
            taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
 
                taskManagerServicesConfiguration.getExternalDataPort() :
 
                listeningDataPort);
 
 // 广播变量管理
 
 final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
 // slot table
 
 final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
 
            taskManagerServicesConfiguration.getNumberOfSlots(),
 
            taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
 
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
 
            taskManagerServicesConfiguration.getPageSize());
 
 // job manager table
 
 final JobManagerTable jobManagerTable = new JobManagerTable();
 
 // JobLeaderService
 
 final JobLeaderService jobLeaderService = new JobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
 


 
 final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
 


 
 final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
 


 
 for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
 
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
 
 }
 
 // 本地状态管理器
 
 final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
 
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
 
            stateRootDirectoryFiles,
 
            taskIOExecutor);
 
 // io执行器
 
 final ExecutorService ioExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("taskexecutor-io"));
 
 // 几个taskManagerService的集合
 
 return new TaskManagerServices(
 
            unresolvedTaskManagerLocation,
 
            taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
 
            ioManager,
 
            shuffleEnvironment,
 
            kvStateService,
 
            broadcastVariableManager,
 
            taskSlotTable,
 
            jobManagerTable,
 
            jobLeaderService,
 
            taskStateManager,
 
            taskEventDispatcher,
 
            ioExecutor);
 
 }
 

这里会创建并启动一系列的taskManager内部的service,这其中也包括org.apache.flink.runtime.taskexecutor.JobLeaderService。

到了这里,taskManager的初始化部分以及启动部分都已经完成,我们再回过头来看org.apache.flink.runtime.minicluster.MiniCluster#submitJob方法:

代码语言:javascript
复制
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
 
 final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
 
 final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
 
 final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
 
 final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
 
 .thenCombine(
 
                dispatcherGatewayFuture,
 
 (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
 
 .thenCompose(Function.identity());
 
 return acknowledgeCompletableFuture.thenApply(
 
 (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
 
 }
 

org.apache.flink.runtime.dispatcher.Dispatcher#submitJob:

代码语言:javascript
复制
@Override
 
 public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
 
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
 


 
 try {
 
 if (isDuplicateJob(jobGraph.getJobID())) {
 
 return FutureUtils.completedExceptionally(
 
 new DuplicateJobSubmissionException(jobGraph.getJobID()));
 
 } else if (isPartialResourceConfigured(jobGraph)) {
 
 return FutureUtils.completedExceptionally(
 
 new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
 
 "resources configured. The limitation will be removed in future versions."));
 
 } else {
 
 return internalSubmitJob(jobGraph);
 
 }
 
 } catch (FlinkException e) {
 
 return FutureUtils.completedExceptionally(e);
 
 }
 
 }

org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob:

代码语言:javascript
复制
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
 
        log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
 


 
 final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
 
 .thenApply(ignored -> Acknowledge.get());
 


 
 return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
 
 if (throwable != null) {
 
                cleanUpJobData(jobGraph.getJobID(), true);
 
 -------------------------
 

进入org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob方法:

代码语言:javascript
复制
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
 
        jobGraphWriter.putJobGraph(jobGraph);
 


 
 final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
 


 
 return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
 
 if (throwable != null) {
 
                jobGraphWriter.removeJobGraph(jobGraph.getJobID());
 
 }
 
 }));
 
 }
 

org.apache.flink.runtime.dispatcher.Dispatcher#runJob:

代码语言:javascript
复制
 private CompletableFuture<Void> runJob(JobGraph jobGraph) {
 
 Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
 


 
 final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 


 
        jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
 


 
 return jobManagerRunnerFuture
 
 .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
 
 .thenApply(FunctionUtils.nullFn())
 
 .whenCompleteAsync(
 
 (ignored, throwable) -> {
 
 if (throwable != null) {
 
                        jobManagerRunnerFutures.remove(jobGraph.getJobID());
 
 }
 
 },
 
                getMainThreadExecutor());
 
 }
 

org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner:

代码语言:javascript
复制
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
 
 final RpcService rpcService = getRpcService();
 


 
 return CompletableFuture.supplyAsync(
 
 CheckedSupplier.unchecked(() ->
 
                jobManagerRunnerFactory.createJobManagerRunner(
 
                    jobGraph,
 
                    configuration,
 
                    rpcService,
 
                    highAvailabilityServices,
 
                    heartbeatServices,
 
                    jobManagerSharedServices,
 
 new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
 
                    fatalErrorHandler)),
 
            rpcService.getExecutor());
 
 }
 

通过org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory#createJobManagerRunner创建JobManagerRunnerImpl实例:

代码语言:javascript
复制
public JobManagerRunnerImpl(
 
 final JobGraph jobGraph,
 
 final JobMasterServiceFactory jobMasterFactory,
 
 final HighAvailabilityServices haServices,
 
 final LibraryCacheManager libraryCacheManager,
 
 final Executor executor,
 
 final FatalErrorHandler fatalErrorHandler) throws Exception {
 


 
 this.resultFuture = new CompletableFuture<>();
 
 this.terminationFuture = new CompletableFuture<>();
 
 this.leadershipOperation = CompletableFuture.completedFuture(null);
 


 
 // make sure we cleanly shut down out JobManager services if initialization fails
 
 try {
 
 this.jobGraph = checkNotNull(jobGraph);
 
 this.libraryCacheManager = checkNotNull(libraryCacheManager);
 
 this.executor = checkNotNull(executor);
 
 this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 


 
            checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
 


 
 // libraries and class loader first
 
 try {
 
                libraryCacheManager.registerJob(
 
                        jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
 
 } catch (IOException e) {
 
 throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
 
 }
 


 
 final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
 
 if (userCodeLoader == null) {
 
 throw new Exception("The user code class loader could not be initialized.");
 
 }
 


 
 // high availability services next
 
 this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 
 this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 


 
 this.leaderGatewayFuture = new CompletableFuture<>();
 
 // org.apache.flink.runtime.jobmaster.JobMasterService这个和taskManager中的jobMasterService是不同的
 
 // now start the JobManager
 
 this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
 
 }
 
 ----------------------
 

jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader):

代码语言:javascript
复制
@Override
 
 public JobMaster createJobMasterService(
 
 JobGraph jobGraph,
 
 OnCompletionActions jobCompletionActions,
 
 ClassLoader userCodeClassloader) throws Exception {
 


 
 return new JobMaster(
 
            rpcService,
 
            jobMasterConfiguration,
 
 ResourceID.generate(),
 
            jobGraph,
 
            haServices,
 
            slotPoolFactory,
 
            schedulerFactory,
 
            jobManagerSharedServices,
 
            heartbeatServices,
 
            jobManagerJobMetricGroupFactory,
 
            jobCompletionActions,
 
            fatalErrorHandler,
 
            userCodeClassloader,
 
            schedulerNGFactory,
 
            shuffleMaster,
 
            lookup -> new JobMasterPartitionTrackerImpl(
 
                jobGraph.getJobID(),
 
                shuffleMaster,
 
                lookup
 
 ));
 
 }
 

这里创建了JobMaster。

org.apache.flink.runtime.dispatcher.Dispatcher#startJobManagerRunner:

代码语言:javascript
复制
 private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
 
 final JobID jobId = jobManagerRunner.getJobID();
 


 
 FutureUtils.assertNoException(
 
            jobManagerRunner.getResultFuture().handleAsync(
 
 (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
 
 // check if we are still the active JobManagerRunner by checking the identity
 
 final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
 
 .map(future -> future.getNow(null))
 
 .orElse(null);
 
 //noinspection ObjectEquality
 
 ------------------------
 
 }, getMainThreadExecutor()));
 


 
        jobManagerRunner.start();
 


 
 return jobManagerRunner;
 
 }
 

在这里启动jobMaster。

在Local client、taskManager、jobMaster中都会启动对应的一个actor service,然后相互之间会进行通信,如心跳和任务分配等。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 示例代码
  • 生成JobGraph流程
    • org.apache.flink.api.scala.DataSet#print:
      • org.apache.flink.api.java.DataSet#print()
        • org.apache.flink.api.java.DataSet#collect
          • org.apache.flink.api.java.ExecutionEnvironment#execute()
            • org.apache.flink.api.java.ExecutionEnvironment#execute(java.lang.String)
              • org.apache.flink.api.java.ExecutionEnvironment#executeAsync(java.lang.String)
                • org.apache.flink.client.deployment.executors.LocalExecutor#execute
                  • org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob
                  相关产品与服务
                  大数据
                  全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档