首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 是如何将你的代码生成 StreamGraph 的(下篇)

Flink 是如何将你的代码生成 StreamGraph 的(下篇)

作者头像
kk大数据
发布2021-03-13 22:26:28
1.1K0
发布2021-03-13 22:26:28
举报
文章被收录于专栏:kk大数据kk大数据

九、一小段源码

上次说到了所有的算子都会转化成 transformation ,并放到一个 List 列表中,那么今天我们开始遍历这个列表,来生成 StreamGraph。

打开这个类 StreamGraphGenerator,generate() 方法(252行),StreamGraph 生成的逻辑就是从这里开始的。

里面有一个 for 循环,遍历的就是上次说到的那个非常重要的 transformations 列表:

for (Transformation<?> transformation: transformations) {
   transform(transformation);
}

然后看 transform 方法(无关的逻辑被我精简掉了),这个方法的作用是,使用不同的转换器,把算子生成的 transformations,转换成 StreamGraph 。

private Collection<Integer> transform(Transformation<?> transform) {
   if (alreadyTransformed.containsKey(transform)) {
      return alreadyTransformed.get(transform);
   }
   
   @SuppressWarnings("unchecked")
   final TransformationTranslator<?, Transformation<?>> translator =
         (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());

   Collection<Integer> transformedIds;
   if (translator != null) {
      transformedIds = translate(translator, transform);
   } else {
      transformedIds = legacyTransform(transform);
   }

   if (!alreadyTransformed.containsKey(transform)) {
      alreadyTransformed.put(transform, transformedIds);
   }
   return transformedIds;
}

首先,看下这个方法的返回值,是一个 Collection<Integer> 类型,也就是说,转换完之后,会返回本次转换的 id。

首先,要获得一个 translator 转换器,可以看到在 static 静态块里,为每一种不同的 transformation 设置了不同的 translator。

获取到转换器之后,进入 translate 方法中,translate 方法有这样一个方法,getParentInputIds(),这是一个很神奇的方法,他是在递归。

final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

赶紧点进去看看:

private List<Collection<Integer>> getParentInputIds(
      @Nullable final Collection<Transformation<?>> parentTransformations) {
   final List<Collection<Integer>> allInputIds = new ArrayList<>();
   if (parentTransformations == null) {
      return allInputIds;
   }

   for (Transformation<?> transformation : parentTransformations) {
      allInputIds.add(transform(transformation));
   }
   return allInputIds;
}

可以看到,它的输入参数是这个 transformation 的 input,然后 for 循环遍历这个 input,for 循环里面又在调用 transform 方法。这就是在递归调用了。

既然是递归调用,那么递归的终止条件是什么呢?

我一开始也很懵啊,debug 的时候,一直在循环往复,头有点大。静下来仔细 debug 了一下,发现终止条件就是:如果没有 input,那就不走到 for 循环里面来,也就直接返回了,这就是终止条件了。

那么,为什么要搞这样的递归调用?

目的就是,在转换一个算子的 transformation 的时候,要把它的上游先转换掉,也就是要从最开始的那个输入开始转换,这样才能顺利的构造出 DAG。

可能看到这,还是很迷茫,没关系,我们拿出具体的数据说话。

十、我们来 debug 一下

下面这个图是当前 transformations 的三个元素

下面的这个是每一个 transformation 的父子关系,括号里是算子的 id,右边是它的 input。

  • Flat Map(2) - Collection Source(1)
  • Keyed Aggregation(4) - Partition(3) - Flat Map(2) - Collection Source(1)
  • Print to Std. out(5) - Keyed Aggregation(4) - Partition(3) - Flat Map(2) - Collection Source(1)

我们从这个 for 循环开始:

当前 transform 方法中,Flat Map 算子作为入参。

它的调用链是:transform -> translate -> getParentInputs -> 遍历 Flat Map 的 inputs ,然后调用 transform 方法

可以看到当前又是在 transform 方法中,但是输入参数是 Collection Source,也就是 Flat Map 的 input。

然后又是依次进入:transform -> translate -> getParentInputs -> 遍历 Collection Source 的 inputs

这个时候,Collection Source 是没有 input 的,所以本次递归就返回了,开始转换 Collection Source。

是用的 LegacySourceTransformationTranslator 这个转换器来转换的,最终就是做了这么一件事,new 了一个 StreamNode,放入了 StreamNodes 的 Map 中。

StreamNode vertex = new StreamNode(
      vertexID,
      slotSharingGroup,
      coLocationGroup,
      operatorFactory,
      operatorName,
      vertexClass);

streamNodes.put(vertexID, vertex);

那么 Collection Source 就处理完了,由于是递归遍历到根节点,那么肯定是会有重复的,所以,已经转换过的,要缓存起来,放到一个 Map 中,下次遇到同样的,就直接跳过了。

Collection Source 处理完了之后,也就是 Flat Map 的 input 处理完了

下面要回来进入 FlatMap 的 translate 方法了(这就是在递归,处理 Flat Map 时,要先处理 Collection Source,等到把 Collection Source 处理完了,再继续回来处理 Flat Map)

FlatMap 是用的 OneInputTransformationTranslator 转换器来转换的。

可以看到它也是一样,new 了一个 StreamNode ,加入到了 streamNodes 列表中。

但是!它还做了另外一件事,那就是:

它还要处理自己的 ParentTransformation,也就是 Collection Source,来构造一个边 Edge。

可以看到这个边,是有方向的,从 Collection Source 到 FlatMap。

然后把这个边放到 Collection Source 的 outEdge 中;再放到 FlatMap 的 inEdge 中。

这样就记录了算子的流向。

这样,FlatMap 就算转换完成了。放入缓存中。

这样对于 transformations 的 一次 for 循环就结束了。

然后开始处理 Keyed Aggregation ,也是一样的流程,先处理它的 input,从最上面一层层往下处理,这里我们就不细说了。

十一、最终结果

最终生成的 StreamGraph中,重要的就是这个 StreamNodes,一共有四个:

每一个 Node 里面有 InEdge,表示这个节点的上游节点是哪个;有 outEdge,表示这个节点的下游节点是哪个。

还有 sources 表示是源,sinks 表示是目标。

十二、小结

好了,本次的 StreamGraph 的 debug 就到这了。

阅读这部分的代码,给我感触最深的就是,要关注主要矛盾,忽略次要分支,才能把脉络梳理清楚,否则就会深陷泥潭,不仅自己没有成就感,而且还耽误了时间。

当然,生成 StreamGraph 的过程中,还有诸多细节,这里我不打算再深究了,如果日后有什么需要,再来看这块其他的代码。

下一次,就是具体的提交任务的过程了,这个过程需要涉及到 Java 的异步编程,所以再安排一次阅读源码必备知识之 Java 异步编程,拭目以待!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-02-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 十、我们来 debug 一下
  • 十一、最终结果
  • 十二、小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档