前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink: 两个递归彻底搞懂operator chain

Flink: 两个递归彻底搞懂operator chain

作者头像
Flink实战剖析
发布2022-04-18 13:22:45
9820
发布2022-04-18 13:22:45
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

operator chain是指将满足一定条件的operator 链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,这种方式减少数据传输过程。常见的chain例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起与chain一起之后如何执行就是本篇文章将要剖析的重点。

第一个递归:JobGraph生成

Flink中划分了四种图:StreamGraph、JobGraph、ExecutionGraph、物理执行图,前两种StreamGraph、JobGraph是在客户端生成,ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。我们在Flink Web UI中看到的就是JobGraph,如下:

JobGraph相对于StreamGraph,可以理解为优化过后的StreamGraph,将能够chain一起的operator chain在一起,上图将source与filter两个operator chain在一起了,这个步骤在生成JobGraph过程中完成。其具体实现在StreamingJobGraphGenerator中:

代码语言:javascript
复制
private JobGraph createJobGraph() {
        .....
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
        setChaining(hashes, legacyHashes, chainedOperatorHashes);
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        configureCheckpointing();
     ....
    }

重点就在setChaining方法中,在里面调用createChain方法,构造JobVertix的同时完成operator chain的操作,createChain方法:

代码语言:javascript
复制
private List<StreamEdge> createChain(
            Integer startNodeId,
            Integer currentNodeId,
            Map<Integer, byte[]> hashes,
            List<Map<Integer, byte[]>> legacyHashes,
            int chainIndex,
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

      if (!builtVertices.contains(startNodeId)) {
   //chain 的出边
          List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
   //能够chain在一起的边
          List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
    //不能够chain一起的边
          List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

          for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
      //isChainable判断是否能够chain在一起
              if (isChainable(outEdge, streamGraph)) {
                  chainableOutputs.add(outEdge);
              } else {
                  nonChainableOutputs.add(outEdge);
              }
          }

          for (StreamEdge chainable : chainableOutputs) {
      //能够chain在一起那么遍历下一个节点
              transitiveOutEdges.addAll(
                      createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
          }

          for (StreamEdge nonChainable : nonChainableOutputs) {
              transitiveOutEdges.add(nonChainable); 
      //以不能chain在一起的节点为起始点重新开始往下遍历
              createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
          }

            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

            byte[] primaryHashBytes = hashes.get(currentNodeId);

            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }

            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

      //如果currentNodeId=startNodeId 那么就说明是一个chain的起点,则需要创建jobVertix
     //不是则表示是chain的一部分,只需要创建StreamConfig即可
        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            config.setChainStart(); //起始chain
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    //连接边
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }

            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
    //currentNodeId属于chain的一部分 
            Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

       if (chainedConfs == null) {
                    chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
        }
          config.setChainIndex(chainIndex);
          StreamNode node = streamGraph.getStreamNode(currentNodeId);
          config.setOperatorName(node.getOperatorName());
          chainedConfigs.get(startNodeId).put(currentNodeId, config);
      }

        config.setOperatorID(new OperatorID(primaryHashBytes));

        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
  //返回chain的出边
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
    }

整个过程就是一个递归的过程,createChain 过程就是不断寻找一个chain的出边,如果邻接的两个operator(source与filter)能够chain在一起,那么就以下一个能够chain一起的operator(filter)为起点,继续寻找,直到找到不能够chain一起的operator(process1),但是此时并没有立刻返回,而是以当前不能chain再一起的operator(process1)为起点继续往下寻找,直到终点(sink)开始一层一层返回,实际上其构造过程是一个反向过程:sink->process2->process1->(source&filter) 这样的一个过程完成operator chain并且构造JobVertix (可通过debug方式查看其详细过程)。

如何判断两个相邻的operator(StreamNode)能够chain在一起?通过isChainable方法判断:

代码语言:javascript
复制
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();

        return downStreamVertex.getInEdges().size() == 1 //下游的数据流入只有一个节点
          && outOperator != null
          && headOperator != null
          && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slotGroup中
  && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //开启operator chain策略
          && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
      headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //head 表示的是一个起点
          && (edge.getPartitioner() instanceof ForwardPartitioner) //直接转发方式
          && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //并行度相同
          && streamGraph.isChainingEnabled(); //默认允许chain
    }

第二个递归:OperatorChain生成

当Execution在deploy的过程中,也就是Task在TaskExecutor启动过程中, 会生成一个OperatorChain对象,在该OperatorChain对象中包含了所有的能够chain在一起的operator(source&filter),其内部会生成一个名为chainEntryPoint的WatermarkGaugeExposingOutput对象,一个将数据输出的对象,其输出有两种形式:

1.函数调用,将数据推送给chain在一起的下一个operator节点(filter中)

2.输出到下一个没有被chain的operator(process1)

那么chainEntryPoint是如何生成的?

OperatorChain的初始化是在StreamTask中被调用的:

代码语言:javascript
复制
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

在其构造函数中调用:

代码语言:javascript
复制
this.chainEntryPoint = createOutputCollector(
                containingTask, //当前的streamTask
                configuration, //chain的第一个节点的StreamConfig
                chainedConfigs, //该chain的所有StreamConfig
                userCodeClassloader,
                streamOutputMap,
                allOps);

createOutputCollector获取当前节点的out,

代码语言:javascript
复制
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

        // 当前operator的网络方式输出(filter->process1)
      for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
          @SuppressWarnings("unchecked")
          RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);

          allOutputs.add(new Tuple2<>(output, outputEdge));
      }

        // chain out (source->filter)
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
     //寻找被chain在一起的下一个operator(filter)的out
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
        }

  //最后将当前节点的out返回
    .....

        }
    }

createChainedOperator方法:

代码语言:javascript
复制
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators,
            OutputTag<IN> outputTag) {
        // 调用createOutputCollector 获取当前operator的out
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
            containingTask,
            operatorConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators);

        // 获取当前的StreamOperator
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
   //作为当前的out 传入setup方法中
        chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);

        allOperators.add(chainedOperator);

  //将被chain的operator(filter)传给上一个operator(source)的out
  //那么在out中就可以直接调用filter的处理source的输出数据了
        WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
        }
        else {
            TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
        }
    ....
        return currentOperatorOutput;
    }

可以看出整个构造chainEntryPoint的过程也是一个递归的过程,会不断寻找被chain在一起的下一个operator的out,直到下游没有可chain的位置,返回网络out作为最后一个operator的out,然后通过使用当前operator构造前一个operator的out,同样是一个反向构造out的过程(filterOut->sourceOut)。简化一下逻辑代码:

代码语言:javascript
复制
createOut(currentOperator){

    if(currentOperator.isNetworkOut){
        currOut=networkOut();
    }else{
        currOut=creatOut(nextOperstor);
    }
    currentOperator.setOut(currOut);
    preOut=CopyingOut(currentOperator);
    return preOut;
}

最终得到的chainEntryPoint就是headOperator的out,这样在其内部不断的通过out调用operator的方式实现了chain的函数调用链关系。

总结

透过以上分析,operator chain就是将满足一定条件的的operator通过函数调用方式传递数据,避免了数据传输的中间过程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第一个递归:JobGraph生成
  • 第二个递归:OperatorChain生成
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档