前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞定 Flink Job 的运行过程

一文搞定 Flink Job 的运行过程

作者头像
shengjk1
发布2021-04-25 14:37:43
1.9K0
发布2021-04-25 14:37:43
举报
文章被收录于专栏:码字搬砖码字搬砖

背景

之前我们知道了Flink 是如何生成 StreamGraph 以及 如何生成 job如何生成Task,现在我们通过 Flink Shell 将他们串起来,这样我们就学习了从写代码开始到 Flink 运行 task 的整个过程是怎么样的。

正文

我们经常通过 Flink Shell 提交代码,如 flink run -p 2 -m yarn-cluster -ynm test -c test ./test-1.0-SNAPSHOT.jar "file" "./test.properties"&通过 flink shell 我们可以知道 org.apache.flink.client.cli.CliFrontend 为整个 Flink Job 的入口类

代码语言:javascript
复制
/**
	 * Submits the job based on the arguments.
	 */
	public static void main(final String[] args) {
		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

		// 1. find the configuration directory
		final String configurationDirectory = getConfigurationDirectoryFromEnv();

		// 2. load the global configuration
		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

		// 3. load the custom command lines
		final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
			configuration,
			configurationDirectory);

		try {
			final CliFrontend cli = new CliFrontend(
				configuration,
				customCommandLines);

			SecurityUtils.install(new SecurityConfiguration(cli.configuration));
			int retCode = SecurityUtils.getInstalledContext()
					.runSecured(() -> cli.parseParameters(args));
			System.exit(retCode);
		}
		catch (Throwable t) {
			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
			LOG.error("Fatal error while running command line interface.", strippedThrowable);
			strippedThrowable.printStackTrace();
			System.exit(31);
		}
	}

main 很简单,主要就两步,发现并加载配置文件,加载并解析命令。在解析命令的过程当中,如果传入的命令是 run,则可以一直追踪到 executeProgram 方法

代码语言:javascript
复制
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
		logAndSysout("Starting execution of program");
		final JobSubmissionResult result = client.run(program, parallelism);
		......
	}

通过 client run 方法来执行,最终调用我们传入的主方法( 通过 -c 参数),然后就开始执行用户代码了,首先会构建 StreamGraph ,最终调用 StreamContextEnvironment execute(String jobName) 方法

代码语言:javascript
复制
@Override
	public JobExecutionResult execute(String jobName) throws Exception {
		
		Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

		StreamGraph streamGraph = this.getStreamGraph();
		streamGraph.setJobName(jobName);

		transformations.clear();

		// execute the programs 存在 -d 时
		if (ctx instanceof DetachedEnvironment) {
			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
			((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
			return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
		} else {
			return ctx
				.getClient()
				.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
				.getJobExecutionResult();
		}
	}

然后

代码语言:javascript
复制
public JobSubmissionResult run(FlinkPlan compiledPlan,
			List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
			throws ProgramInvocationException {
		// 构建 jobGraph
		JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
		//将 job 提交至 cluster 上
		return submitJob(job, classLoader);
	}

主要就是构建 jobGraph ,关于构建 jobGraph 的细节可以参考 如何构建 job ,构建成功之后就开始提交 job 了。我们以 MiniCluster 为例

代码语言:javascript
复制
@Override
	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
		final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
		......
	}
代码语言:javascript
复制
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
		final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
		
		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
		// from the ResourceManager
		jobGraph.setAllowQueuedScheduling(true);
		
		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
		
		// cache jars and files
		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
		
		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
			.thenCombine(
				dispatcherGatewayFuture,
				// 这里真正 submit 操作,交给了 dispatcher 去执行
				(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
			.thenCompose(Function.identity());
		
		return acknowledgeCompletableFuture.thenApply(
			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
	}

接下来就到了 job 正式运行的时候了

代码语言:javascript
复制
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

		//创建 jobManagerRunner 同时也会创建 jobMaster,在创建 JobMaster 的时候构建了 ExecutionGraph
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

		// start jobManagerRunner 同时也启动了 jobMaster 等一系列 service,然后就开始调度 executionGraph,execution.deploy task.start
		return jobManagerRunnerFuture
			.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
			.thenApply(FunctionUtils.nullFn())
			.whenCompleteAsync(
				(ignored, throwable) -> {
					if (throwable != null) {
						jobManagerRunnerFutures.remove(jobGraph.getJobID());
					}
				},
				getMainThreadExecutor());
	}

这部分内容与 如何构建Job 是一致的,省略若干,具体可以参考 如何构建 job ,需要强调一点就是当 执行到 ExecutionGraph 的 scheduleForExecution方法时

代码语言:javascript
复制
// 调度 execution
	public void scheduleForExecution() throws JobException {

		assertRunningInJobMasterMainThread();

		final long currentGlobalModVersion = globalModVersion;

		// 会启动 startCheckpointScheduler
		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
			......
	}

会启动 CheckpointScheduler 从而开始出发 checkpoint。

接下来就开始部署,可以参考 如何构建 job如何生成Task

至此为止,从写代码到代码的计算执行,整个过程我们都已经学习清楚了。

总结

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 正文
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档