专栏首页码字搬砖一文搞懂 Flink如何移动计算

一文搞懂 Flink如何移动计算

对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于 Flink 来说是如何移动计算的呢?我们一起重点看一下 ExecuteGraph

基本概念

ExecutionJobVertex:表示 JobGraph 的一个计算顶点,每个 ExecutionJobVertex 可能会有很多个 并行的 ExecutionVertex ExecutionVertex:表示一个并行的 subtask Execution: 表示 ExecutionVertex 的一次尝试执行

Graph 变化

源代码

由 一文搞定 Flink Job 提交全流程 我们可以知道在 创建 jobMaster 的同时还 create executionGraph ,一路追踪至 ExecutionGraphBuilder.buildGraph 方法

......
// topologically sort the job vertices and attach the graph to the existing one
		// 排好序的 topology  source->flatMap  Filter->sink
		// 一个 operator chain 形成一个 JobVertex 。single operator as a special operator chain
		List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
		if (log.isDebugEnabled()) {
			log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
		}
		executionGraph.attachJobGraph(sortedTopology);
		......

进入 attachJobGraph

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

		assertRunningInJobMasterMainThread();

		LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
				"vertices and {} intermediate results.",
			topologiallySorted.size(),
			tasks.size(),
			intermediateResults.size());

		final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
		final long createTimestamp = System.currentTimeMillis();
		//从 source operator chain 开始
		for (JobVertex jobVertex : topologiallySorted) {

			if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
				this.isStoppable = false;
			}

			/*
			//在这里生成 ExecutionGraph 的每个节点
            //首先是进行了一堆赋值,将任务信息交给要生成的图节点,以及设定并行度等等
            //然后是创建本节点的 IntermediateResult,根据本节点的下游节点的个数确定创建几份
            //最后是根据设定好的并行度创建用于执行 task 的 ExecutionVertex
            //如果 job 有设定 inputsplit 的话,这里还要指定 inputsplits
			 */
			// create the execution job vertex and attach it to the graph
			// 已经开始并行化了
			ExecutionJobVertex ejv = new ExecutionJobVertex(
				this,
				jobVertex,
				1,
				rpcTimeout,
				globalModVersion,
				createTimestamp);

			/*
			//这里要处理所有的JobEdge
            //对每个edge,获取对应的intermediateResult,并记录到本节点的输入上
            //最后,把每个ExecutorVertex和对应的IntermediateResult关联起来
			 */
			ejv.connectToPredecessors(this.intermediateResults);

			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
			if (previousTask != null) {
				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
					jobVertex.getID(), ejv, previousTask));
			}

			for (IntermediateResult res : ejv.getProducedDataSets()) {
				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
				if (previousDataSet != null) {
					throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
						res.getId(), res, previousDataSet));
				}
			}

			this.verticesInCreationOrder.add(ejv);
			this.numVerticesTotal += ejv.getParallelism();
			newExecJobVertices.add(ejv);
		}

		terminationFuture = new CompletableFuture<>();
		failoverStrategy.notifyNewVertices(newExecJobVertices);
	}

关键性方法 new ExecutionJobVertex,除了进行了一些基本的赋值操作外,还并行化了 intermediateResult,并行化了 ExecutionVertex。 说白点,就是创建了几个特别相似的 intermediateResult 对象以及 ExecutionVertex 对象,具体如下

// 已经开始并行化了
	public ExecutionJobVertex(
			ExecutionGraph graph,
			JobVertex jobVertex,
			int defaultParallelism,
			Time timeout,
			long initialGlobalModVersion,
			long createTimestamp) throws JobException {

		if (graph == null || jobVertex == null) {
			throw new NullPointerException();
		}

		this.graph = graph;
		this.jobVertex = jobVertex;

		int vertexParallelism = jobVertex.getParallelism();
		// 最终的并行度
		int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;

		final int configuredMaxParallelism = jobVertex.getMaxParallelism();

		this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);

		// if no max parallelism was configured by the user, we calculate and set a default
		setMaxParallelismInternal(maxParallelismConfigured ?
				configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

		// verify that our parallelism is not higher than the maximum parallelism
		if (numTaskVertices > maxParallelism) {
			throw new JobException(
				String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
					jobVertex.getName(),
					numTaskVertices,
					maxParallelism));
		}

		this.parallelism = numTaskVertices;

		this.taskVertices = new ExecutionVertex[numTaskVertices];
		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
		this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());

		this.inputs = new ArrayList<>(jobVertex.getInputs().size());

		// take the sharing group
		this.slotSharingGroup = jobVertex.getSlotSharingGroup();
		this.coLocationGroup = jobVertex.getCoLocationGroup();

		// setup the coLocation group
		if (coLocationGroup != null && slotSharingGroup == null) {
			throw new JobException("Vertex uses a co-location constraint without using slot sharing");
		}

		// create the intermediate results
		this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];

		// intermediateResult 开始并行化
		for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
			final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);

			this.producedDataSets[i] = new IntermediateResult(
					result.getId(),
					this,
					numTaskVertices,
					result.getResultType());
		}

		Configuration jobConfiguration = graph.getJobConfiguration();
		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();

		// create all task vertices
		// 移动计算
		// ExecutionVertex 开始并行化
		for (int i = 0; i < numTaskVertices; i++) {
			ExecutionVertex vertex = new ExecutionVertex(
					this,
					i,
					producedDataSets,
					timeout,
					initialGlobalModVersion,
					createTimestamp,
					maxPriorAttemptsHistoryLength);

			this.taskVertices[i] = vertex;
		}

		// sanity check for the double referencing between intermediate result partitions and execution vertices
		for (IntermediateResult ir : this.producedDataSets) {
			if (ir.getNumberOfAssignedPartitions() != parallelism) {
				throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
			}
		}

		// set up the input splits, if the vertex has any
		try {
			@SuppressWarnings("unchecked")
			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();

			if (splitSource != null) {
				Thread currentThread = Thread.currentThread();
				ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
				currentThread.setContextClassLoader(graph.getUserClassLoader());
				try {
					inputSplits = splitSource.createInputSplits(numTaskVertices);

					if (inputSplits != null) {
						splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
					}
				} finally {
					currentThread.setContextClassLoader(oldContextClassLoader);
				}
			}
			else {
				inputSplits = null;
			}
		}
		catch (Throwable t) {
			throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
		}
	}

至此移动计算,就算清楚了

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink startupMode是如何起作用的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1
  • 彻底搞懂 Flink Kafka OffsetState 存储

    写给大忙人看的Flink 消费 Kafka 已经对 Flink 消费 kafka 进行了源码级别的讲解。可是有一点没有说的很明白那就是 offset 是怎么存储...

    shengjk1
  • Flink是如何kafka读取数据的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1
  • zookeeper源码分析(6)-数据和存储

    在Zookeeper中,数据存储分为两部分:内存数据存储和磁盘数据存储。本文主要分析服务器启动时内存数据库的初始化过程和主从服务器数据同步的过程。在此之前介绍一...

    Monica2333
  • EventBus源码解析

      相信大家已经非常熟练的使用EventBus了,简单的说EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事...

    黄林晴
  • 深入理解python面向对象-特殊成员(补)

    定义一个instance类变量,然后重载__new__方法,这样就能确定此类只创建一个实例,但是返回值是一个实例对象,所以__init__每次还是会被调用。__...

    星星在线
  • android多屏适配

    第一个和第二个比较好理解,但是应用范围比较窄.比如我需要一个长宽均为屏幕一半的button并且要在屏幕正中间.用match_parent肯定不行,用weight...

    提莫队长
  • 【小家Spring】Spring MVC容器的web九大组件之---HandlerAdapter源码详解---HttpMessageConverter的匹配规则(选择原理)

    在前一篇文章: 【小家Spring】Spring MVC容器的web九大组件之—HandlerAdapter源码详解—HttpMessageConverter...

    YourBatman
  • Flutter跨平台移动端开发丨Column、Row、Flex、Wrap、Flow、Stack

    flex 可以按水平或垂直方向排列子 widget,并且允许子 widget 按照比例分配父 widget 的空间,row 和 column 均继承自 flex

    码脑
  • 黑盒模型实际上比逻辑回归更具可解释性

    如何让复杂的模型具备可解释性,SHAP值是一个很好的工具,但是SHAP值不是很好理解,如果能将SHAP值转化为对概率的影响,看起来就很舒服了。先前阿Sam也写过...

    Sam Gor

扫码关注云+社区

领取腾讯云代金券