专栏首页码字搬砖一文搞定 Flink Job 提交全流程

一文搞定 Flink Job 提交全流程

前言

前面,我们已经分析了 一文搞定 Flink 消费消息的全流程写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint Barrier 全流程 等等,接下来也该回归到最初始的时候,Flink Job 是如何提交的。

正文

我们知道 Flink 总共有两种提交模式:本地模式和远程模式( 当然也对应着不同的 environment,具体可以参考 Flink Context到底是什么?),我们以本地模式为主,两种模式基本上相似。

当我们执行 env.execute ,实际上调用了 LocalStreamEnvironment.execute 方法

/**
	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
	 * specified name.
	 *
	 * @param jobName
	 *            name of the job
	 * @return The result of the job execution, containing elapsed time and accumulators.
	 */
	@Override
	// 本地模式执行方法 env.execute
	public JobExecutionResult execute(String jobName) throws Exception {
		// transform the streaming program into a JobGraph
		//TODO 111
		//获取 streamGraph
		StreamGraph streamGraph = getStreamGraph();
		streamGraph.setJobName(jobName);
		
		//获取 jobGraph
		JobGraph jobGraph = streamGraph.getJobGraph();
		jobGraph.setAllowQueuedScheduling(true);

		Configuration configuration = new Configuration();
		configuration.addAll(jobGraph.getJobConfiguration());
		configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

		// add (and override) the settings with what the user defined
		configuration.addAll(this.configuration);

		if (!configuration.contains(RestOptions.BIND_PORT)) {
			configuration.setString(RestOptions.BIND_PORT, "0");
		}

		int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
			.setConfiguration(configuration)
			.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
			.build();

		if (LOG.isInfoEnabled()) {
			LOG.info("Running job on local embedded Flink mini cluster");
		}

		MiniCluster miniCluster = new MiniCluster(cfg);

		try {
			//启动集群,包括启动JobMaster,进行leader选举等等
			miniCluster.start();
			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

			// 提交任务到JobMaster
			return miniCluster.executeJobBlocking(jobGraph);
		}
		finally {
			transformations.clear();
			miniCluster.close();
		}
	}

这里构建了 StreamGraph、JobGraph,到后面还会有 ExecutionGraph,关于这些图的一些东西,一张图就差不多了

当 miniCluster.start() 时

// start cluster
	public void start() throws Exception {
		synchronized (lock) {
			checkState(!running, "MiniCluster is already running");

			......
				ioExecutor = Executors.newFixedThreadPool(
					Hardware.getNumberCPUCores(),
					new ExecutorThreadFactory("mini-cluster-io"));
				//创建 HA service
				haServices = createHighAvailabilityServices(configuration, ioExecutor);

				//启动 blobServer
				blobServer = new BlobServer(configuration, haServices.createBlobStore());
				blobServer.start();

				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

				blobCacheService = new BlobCacheService(
					configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
				);

				// task executor
				startTaskManagers();
......
				resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
				dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
				webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
			......
	}

创建了 HaService,启动了 blobCacheService、resourceManagerLeaderRetriever、dispatcherLeaderRetriever、webMonitorLeaderRetrievalService,我们重点看一下 startTaskManagers

@VisibleForTesting
	void startTaskExecutor() throws Exception {
		synchronized (lock) {
			final Configuration configuration = miniClusterConfiguration.getConfiguration();

			final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
				configuration,
				new ResourceID(UUID.randomUUID().toString()),
				taskManagerRpcServiceFactory.createRpcService(),
				haServices,
				heartbeatServices,
				metricRegistry,
				blobCacheService,
				useLocalCommunication(),
				taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

			taskExecutor.start();
			taskManagers.add(taskExecutor);
		}
	}

TaskExecutor 创建了 TaskExecutor 并启动了。最终的用来submitTask、cancalTask、stopTask 、执行 task 、confirmCheckpoint、requestSlot、freeSlot 等等。

一些必要的组件已经启动成功,接下来该提交 jobGraph 了 miniCluster.executeJobBlocking(jobGraph); 跟踪代码

public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
		final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
		
		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
		// from the ResourceManager
		jobGraph.setAllowQueuedScheduling(true);
		
		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
		
		// cache jars and files
		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
		
		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
			.thenCombine(
				dispatcherGatewayFuture,
				// 这里真正 submit 操作,交给了 dispatcher 去执行
				(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
			.thenCompose(Function.identity());
		
		return acknowledgeCompletableFuture.thenApply(
			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
	}

最终交给了 dispatcher 来进行 jobGraph 的提交,最终到这里

private CompletableFuture<Void> runJob(JobGraph jobGraph) {
		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

		//创建 job Manager runner
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

		// start job manager
		return jobManagerRunnerFuture
			.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
			.thenApply(FunctionUtils.nullFn())
			.whenCompleteAsync(
				(ignored, throwable) -> {
					if (throwable != null) {
						jobManagerRunnerFutures.remove(jobGraph.getJobID());
					}
				},
				getMainThreadExecutor());
	}

这个时候开始创建 jobManagerRunner,在创建 jobManagerRunner 的同时也会创建 jobMaster

public JobMaster(
			RpcService rpcService,
			JobMasterConfiguration jobMasterConfiguration,
			ResourceID resourceId,
			JobGraph jobGraph,
			HighAvailabilityServices highAvailabilityService,
			SlotPoolFactory slotPoolFactory,
			SchedulerFactory schedulerFactory,
			JobManagerSharedServices jobManagerSharedServices,
			HeartbeatServices heartbeatServices,
			JobManagerJobMetricGroupFactory jobMetricGroupFactory,
			OnCompletionActions jobCompletionActions,
			FatalErrorHandler fatalErrorHandler,
			ClassLoader userCodeLoader) throws Exception {

		super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));

		this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
		this.resourceId = checkNotNull(resourceId);
		this.jobGraph = checkNotNull(jobGraph);
		this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
		this.blobWriter = jobManagerSharedServices.getBlobWriter();
		this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
		this.jobCompletionActions = checkNotNull(jobCompletionActions);
		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
		this.userCodeLoader = checkNotNull(userCodeLoader);
		this.heartbeatServices = checkNotNull(heartbeatServices);
		this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);

		final String jobName = jobGraph.getName();
		final JobID jid = jobGraph.getJobID();

		log.info("Initializing job {} ({}).", jobName, jid);

		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
				jobGraph.getSerializedExecutionConfig()
						.deserializeValue(userCodeLoader)
						.getRestartStrategy();

		// 设置重启策略
		this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
			jobManagerSharedServices.getRestartStrategyFactory(),
			jobGraph.isCheckpointingEnabled());

		.....
		//TODO 111
		//createExecutionGraph 可能会 restore from checkpoint(savepoint)
		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
		......
	}

最关键的时在创建 jobMaster 的同时还 create executionGraph。然后开始启动 jobManagerRunner,最终会启动 jobMaster

private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

		try {
			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
		} catch (IOException e) {
			return FutureUtils.completedExceptionally(
				new FlinkException(
					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
					e));
		}

		final CompletableFuture<Acknowledge> startFuture;
		try {
			// 通过给定的 jobId start job master
			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
		} catch (Exception e) {
			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
		}

		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
		return startFuture.thenAcceptAsync(
			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
			executor);
	}

jobMaster 启动完,就会正式开始执行 job 了

public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
		// make sure we receive RPC and async calls
		start();
		// 正式 开始执行  Job
		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
	}

开始正式执行 job

// start job execution
	private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

		validateRunsInMainThread();

		checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

		if (Objects.equals(getFencingToken(), newJobMasterId)) {
			log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

			return Acknowledge.get();
		}

		setNewFencingToken(newJobMasterId);

		startJobMasterServices();

		log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
		
		// 重新设置或者调度 executionGraph
		resetAndScheduleExecutionGraph();

		return Acknowledge.get();
	}

然后就开始调度 executionGraph 了

// 调度 execution
	public void scheduleForExecution() throws JobException {

		assertRunningInJobMasterMainThread();

		final long currentGlobalModVersion = globalModVersion;
		
		//改变 job 的状态,由 CREATED 变为 RUNNING
		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

			final CompletableFuture<Void> newSchedulingFuture;

			switch (scheduleMode) {

				case LAZY_FROM_SOURCES:
					newSchedulingFuture = scheduleLazy(slotProvider);
					break;

				case EAGER:
					// 300000 ms default
					//开始调度
					newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
					break;

				default:
					throw new JobException("Schedule mode is invalid.");
			}

			if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
				schedulingFuture = newSchedulingFuture;
				newSchedulingFuture.whenComplete(
					(Void ignored, Throwable throwable) -> {
						if (throwable != null && !(throwable instanceof CancellationException)) {
							// only fail if the scheduling future was not canceled
							failGlobal(ExceptionUtils.stripCompletionException(throwable));
						}
					});
			} else {
				newSchedulingFuture.cancel(false);
			}
		}
		else {
			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
		}
	}

调度了之后就开始 deploy 了

/**
	 * Deploys the execution to the previously assigned resource.
	 *
	 * @throws JobException if the execution cannot be deployed to the assigned resource
	 */
	// 从 source 到 sink 循环部署
	public void deploy() throws JobException {
		assertRunningInJobMasterMainThread();

		final LogicalSlot slot  = assignedResource;
		.....

			// TaskDeploymentDescriptor 这个类保存了 task 执行所必须的所有内容,
			// 例如序列化的算子,输入的 InputGate 和输出的 ResultPartition 的定义,该 task 要作为几个 subtask 执行等等。
			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
				attemptId,
				slot,
				taskRestore,
				attemptNumber);

			// null taskRestore to let it be GC'ed
			taskRestore = null;

			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

			final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
				vertex.getExecutionGraph().getJobMasterMainThreadExecutor();


			// We run the submission in the future executor so that the serialization of large TDDs does not block
			// the main thread and sync back to the main thread once submission is completed.
			// 提交 task 先 source
			// 对于 TM 来说,执行 task 就是把收到的 TaskDeploymentDescriptor 对象转换成一个 task 并执行的过程。
			CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
				.thenCompose(Function.identity())
				.whenCompleteAsync(
					(ack, failure) -> {
						// only respond to the failure case
						if (failure != null) {
							if (failure instanceof TimeoutException) {
								String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

								markFailed(new Exception(
									"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
										+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
							} else {
								markFailed(failure);
							}
						}
					},
					jobMasterMainThreadExecutor);

		}
		catch (Throwable t) {
			markFailed(t);
			ExceptionUtils.rethrow(t);
		}
	}

部署的过程当中可能会申请资源,然后就开始提交 task 了,再往下就开始执行 task 了。

总结

remote 模式:yarn

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink是如何kafka读取数据的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1
  • Flink安装

    前提:安装:已安好hadoop,环境已经配好 java 7.X 及其以上 scala有对应版本 单机: 下载、解压、 需要注意:flink与hadoop版...

    shengjk1
  • Flink startupMode是如何起作用的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1
  • 在XMLSignature中使用BouncyCastle做RSA

    There is an article shows demo code for making XMLSignature by using Java XML Di...

    用户3579639
  • 札记:翻译-使用Scene和Transition实现【场景切换】动画效果

    简述:transitions framework 下面翻译transition为“过渡”,强调动画过程的含义,不过更多时候使用transition单词本身。 ...

    用户1172465
  • 通过运行时单步调试弄清楚[(ngModel)]的双向绑定的工作原理

    我在UI输入一个a,因为双向绑定,Component的bookName值也会变成a,这是怎么实现的呢?

    Jerry Wang
  • Something about Pyth

    py3study
  • Linux sudo 命令使用简介

    #编辑/etc/sudoers (注意,这里使用 visudo 而不是 vi 来设置。)

    授客
  • Tour UVA - 1347

    John Doe, a skilled pilot, enjoys traveling. While on vacation, he rents a small...

    attack
  • NoSQL如何构建数据存储模型

    翻译内容:NoSQL Distilled 第三章 数据模型详解 3.5 Modeling for Data Access 作者简介: ? ...

    ImportSource

扫码关注云+社区

领取腾讯云代金券