00:00
接下来我们其实已经把之前前两个问题都已经解决了,哎,我们现在通过一个一个代码,假如说我们知道看到它的这个执行图了之后啊,我们知道这里边一共有几个任务,然后知道每一个任务的并行度是多少啊,那接下来其实我们就知道了,到底要分配多少个slot对吧,几个slot就可以把它跑起来,其实就是这里边最大的那个并行度,就是当前整个流处理程序的并行度,然后就是我们所要占据的lo资源的数量,现在我们已经解决这个问题了,但现在还有一个问题是。就是flink程序里边它到底划分多少个任务task呢?因为前面我们看到的是这个,我们本来认为每一个算子,每一步操作,它就应该是一个任务啊,就应该是一个task,但是为什么这里边你看到有些情况下会把一些任务合在一起呢?然后有些情况下它又不合呢?到底什么时候能和,什么时候不能和呢?诶,这是我们接下来要解决的最后一个问题,这个问题解决了,大家就知道我们生成当时大家看到提交任务的时候,在web UI上看到的那幅图啊,那个执行计划图到底是怎么回事了啊,要解决这个问题,我们要从程序代码和数据流说起啊,呃,那大家要看看这个就是怎么样一步一步,我们说从这个源代码一步一步转换成task manager的slot上可以执行的那个程序的啊,那这里边首先我们的代码里边呢,给大家说一下代码的结构,对于这个flink而言,它的代码结构其。
01:35
其实就是基于这个执行环境env,然后一步一步链式的调用,对吧,后面就是点点点不停的调用方法就可以了,然后这些能够调用的这些API啊,我们转换的这些方法主要分成三类啊,大家看主要是什么呢?一个就是source。另外一个是从transform中间transform,最后是think,他们从这个命名上大家看的也非常的明确,那主要就是什么呢?South就是数据源嘛,这就是一开始env直接调用的一个方法,直接去读取数据源,得到一个data stream。
02:14
然后接下来的操作呢,就都是基于data stream的操作了,啊,就是我们所谓的这个data stream API了,那接下来的这个转都叫做中间的转换操作,这些操操作就各种各样,你可以做map,可以做reduce,可以开窗做统计,对吧,就是各种各样的方式对它去进行加工处理,计算。而最后呢,还有一个think,就是你做完处理计算之后,到底要输出到哪里,用这个thinkin任务think算子来负责输出,我们之前的think是什么呢?那大家看这里面的think是叫做ADD think对吧?那我们之前有没有think呢?也是有的,我们之前的代码里边最后一步大家看print这里是不是就是一个think呀?啊,就是如果大家仔细看我们之前的这一个这个图的话,你会发现最后一个任务它就叫做think,然后具体的操作是print到标准的控制台输出,对吧?呃,Standard out啊,所以这是这其实就是我们定义的一个输出,所以整个的flink程序啊,所有的这个flink程序,它其实都是包含了这么三部分。
03:24
然后这三部分呢,我们说在flink里边,它本身并没有stage的概念,它就是一个一个的任务,对不对?所以它其实就是你可以从这个代码里边,把每一个任务,我们定义的每一个算子提取出来,然后把它映射成一个所谓的逻辑数据流,就是data flow直接映射成这样的一个流。那这个流呢,就是以一个或者多个S开始,诶,我们也可以多个圆对吧,呃,你你多条流嘛,多条多条流去做合并转换,这个也是可以的,然后可以有一个或多个think结束,你可以有多条输出也是可以的啊,那所以这就相当于什么呢?就是一个有效无环图dag嘛,啊跟我们在这个Spark做任务处理的过程当中定义的这些东西都是一样的,哎,但是这里大家注意不会再去划分stage了,而是怎么样呢?我们直接画出数据流图,它跟。
04:22
代码里边定义好的那些算子几乎就是一一对应的关系,也就是说我们在代码里边你定义了一个Fla map,那其实对应的最后的这个转换操作啊,我们的这个逻辑数据流里边就会有一步操作是Fla map。然后如果后边你定义了一部sum,呃,一部map,同样它也会有这样的一步操作,叫做map,叫做some。这就是完全一一对应的,这就是所谓的流式处理,一切都是流,不划分stage,那它怎么处理呢?就是按照顺序嘛,我们定义的这个数据流图里边不就是一步一步做操作吗?那同样我们这里边也是数据来了之后,数据流动,你先到第一步,我们定义的是SS任务,哎,那你就进SS任务去读取,读出来之后你先做一个Fla map,那接下来S做完你就传递给一个,呃,这个Fla map任务,对吧,他去做转换操作,做完了之后继续进行,进行下一步,直到最后传递给一个think任务,然后输出。
05:21
大家看这个整个这个处理过程只针对当前的这个数据的操作步骤来讲,对吧,他根本不用等其他的数据,也不用等其他的并行任务,一点关系都没有,只考虑自己的流动就可以了。这就是所谓的数据流data flow。啊,这是这个关于link里边程序数据流的一个概念,然后接下来我们有了这个数据流之后,那最后怎么样得到pass manager上可以执行的那个操作呢?因为大家看这个数据流,我们在代码里面定义的时候,只有每一步操作,我并不知道它到底有几个几个并行度,对吧,到底是拆分成什么样子,我也不知道它的这个数据传输的模式到底到底怎么去传完完全不知道。那这个怎么。
06:09
接下来进一步去做细化,做这个拆分呢,那就涉及到了一个概念,叫做执行图啊,这个执行图其实就是大家最后看到的job manager要去生成,交给task manager去做执行分配任务的那张图,这张图就就是前面我们在那个讲并行度的时候啊这张图。这就是一个标准的直行图。啊,这就是你看把每一个这个任务都已经拆开了,对吧?然后中间的这个数据传输模式都已经定义好了,那接下来这个pass manager你就照着每一个任务分配下去,你照着它去执行,然后执行完了之后照着这个去去传输对吧?向下游去传输就完事了,哎,这就是一个执行图,那这个执行图它并不是一下子就得到了,从这个代码里边一下就得到了,那中间包括了几步转换操作,哪几步呢?给大家列出来啊,主要的步骤有四步,这里边有这样的四步,所以我们可以一般情况就是把这个执行图,大家可以认为转换的过程啊分成了四层。
07:12
首先第一层是所谓的stream graph,也就是说呃,流图对吧,就是所所谓的这个数据流图啊,这就是根据我们一开始啊,你调用这个API。Data stream API啊,流处理的API写的代码直接生成的那个图啊,那这个图其实就相当于什么呢?就像我们之前的这个代码里边,我们写的是啊,你首先是这个socket test stream,对吧,这是个source,然后怎么样呢?然后基于这个Fla map,然后filter,然后map,然后K,之后S,对吧?那所以我们画出来的这个最初的数据流图,当然就是第一步是socket,对吧,读读数据源,第二步Fla map,然后下一步filter,下一步map,最后一步sum,呃,这个sum之后,最后一步print输出,对吧,这是think。
08:00
这就是最初一步,一步真的是按照我们想的每一个算子都对应的一个任务,就是这样列出来的,就是最初的这张图,然后接下来呢,还要再去进一步生成。那下一步生成的叫做job graph job graph又是什么呢?它是在当前的这个client,我们在做那个客户端提交的时候,它首先就要对于我们当前的这个代码做一些合并优化啊,这个我们就直接看下一页的这张图,大家可能会看的更更清楚一点啊,但是这个图可能稍微有点小,大家主要知道是什么样子就可以了,就最初我们代码里边生成的图就是按照算算子,每一个算子就是一个任务啊,这个就是直接已经按照根据代码直接生成的啊,然后呢,我们在客户端提交之前,它本身要做一个优化,就是把符合某种条件的某一些任务合并在一起,类似于合并成了一个大任务。
09:01
那大家知道这个优化主要是干什么,那肯定就是方便方便操作吧,对吧,你把这个任务,呃,这个合的少一点,这个比较简单的一些操作,你不要那么麻烦把它拆开,直接就把它合在一起就完事了啊这个就是呃,要做这样一个优化,然后呢,就生成了这个图,叫做job job graph。接下来我们说client会提交对吧,提交给job manager,那job manager接收到job graph之后呢,它会进一步按照我们定义的所谓的这个并行度把它拆开,对吧?啊,所以就是严格意义上来讲,之前我们在这个web UI上看到这个图,大家看到这个应该叫job graph,对吧,我们之前一直管它叫这个执行计划图,其实这个就是标准的概念,应该叫job graph,就是作业图提交之后我们就能看到的这个东西。但是现在还没经过job job manager的处理,最后要执行的还不是这个,因为我们看这个并行的任务没拆开对吧?啊,但是基本上这个架构我们已经能看出来了啊,所以接下来照manager要做的事情是根据并行度把每一个任务分别拆开,然后他们之间的这种传输方式定义清楚,然后这这就相当于我现在是真正一个可执行的任务了,对吧?我要做的操作在这代码里边定义了到底是什么算子,就是做什么操作嘛,然后我的数据从哪个任务发过来,然后呢,做完操作之后我去再发送到下游的哪个任务去,那这些都已经定义好的,这就是一个真正可执行的东西了。
10:38
所以接下来draw生成的这个就叫做真正的execution graph执行图。他会把这个执行图分发给task manager task manager就按照这个来定义自己的每个slot上去执行任务,对吧,然后去把这些任务数据来了之后,把这个计算结果来得到最终的结果,那最终这个task manager上执行的这个东西呢,可能还又要做一个转换,那就是基于这个execution graph呢,要变成一个物理可执行的一个图,那这个就是有时候管它叫做物理执行图,这个其实就不重要了啊,大家看这个结构基本上跟上面这个excu graph是完全一样的,只不过涉及到了一些,呃,就是最后我们部署到task manager上之后的一些物理层面的一些具体的调整,具体的实现啊,那这个一般我们就不去专门去研究了,一般就是说到这个execution graph,这步就够了。
11:36
那大家看到这里边主要,呃,我们得到这个执行图的过程,其实主要有两个重要的操作,一个就是后边我们要把按照并行度把它拆开,对吧?呃,这个我们就知道了,这个每一个并行的任务分配到不同的lo上,然后呢,哎,不同的这个。前后发生的这个任务啊,还可以做slot共享,所以大家知道诶,当前我可以有两个slot,是不是就可以把当前这个任务直接跑,这个作业直接跑起来,对吧?就当前我上面的这三个任务共享一个slot,下面这两个共享一个,最后用两个slot就跑起来了,这个问题我们已经解决了,前面已经讲完了,那另外还有一个问题没有解决,就是。
12:20
大家看到上面还有一步操作,提前已经做过的有些任务可以合并在一起,那这个又是怎么合并的呢?啊,这就涉及到了,我们要先说一说本身前后任务之间,他们的数据传输形式是什么样的。这里边我们提出一个概念,就是说,呃,首先啊,前后发生的这个任务啊,他们先后有顺序的这个任务,他们之间要有这个数据传输,对吧?首先是它们之间肯定是上游会把数据传递给下游,然后其次呢,它们之间本身这个这个算子啊,上下游的算子可以有不同的并行度,这个概念就是说我在定义的时候呢,上游这里边可能有一个map,下游呢,哎,这边有一个sum。
13:08
它们之间本身大家想到我上游假如并行度是二,因为可以单独定义嘛,然后下游并行度是三。那是不是在这个过程当中,呃,大家想想这个我到底应该怎么样这个map任务去往这个萨任务去去分配这个数据呢?啊,当然这个例子举的不是很好,因为some姆任务的话,这里边还涉及到了一个K败的过程,对吧?呃,就是大家理解,呃,类似于有一个基于哈西扣的重分区做沙作的一个过程了,那这里边我们假如说先不考虑这个萨啊,假如我们把这个考虑成这个map后面有一个filter。哎,这个就比较简单了,不涉及到我们想到那个重分区对吧,只是一步操作,现在呢,他们的并行度不一样,那这个怎么办呢。还有有同学可能想,那这个简单嘛,对吧,你这里边来了数据,来了一个A,做完map操作之后,哎,你你随便做一个做一个定义对吧?比方说给到这里边第一个这个filter的这个子任务,然后你来了一个B,这里边并行做计算,计算完了之后你再随便找一个给到它不就完了吗?对吧?哎,那现在问题就来了,我下面再来一个C,你要这么说的话,我当前的这个map任务就永远都传给他,对吧?那这样的话是A和C就都来这儿,B就永远都来这儿。
14:24
那下边的这一个我并行度是三,这个第三个并行的filter子任务,这不就相当于没有利用起来吗。所以这里大家要注意一下啊,就是如果说当前我们涉及到了不同并行度的这个调整的时候,上下游有不同并行度的时候啊,那这里边默认它会怎么样呢?默认会做一个所谓的redistribu redistributing就是做一个呃,重分配,重分区,那这里边假如说只是简单的这种重分区操作啊,它会按照什么模式来来定义,那其实就是一个轮巡的方式,就是我这里边上游啊,这里边来的数据AC,假如后面还有de对吧,那怎么分配呢?那就是A给第一个,这个C给第二个。
15:15
D给第三个下面E再给第一个,就是这样轮询我自己来的这个所有的数据轮巡的发送到下游的每一个啊,就是所有的数据啊,按照顺序发送到每一个下游的子任务里,并行的子任务里面去。那同样我这里面map的第二个并行子任务也是一样,对吧,那这B来了之后,假如说我是先给到这儿了,那下一个是不是就给到他呀,对吧,再下一个给到他,就这样去做轮许。啊,这是这个并行度调整的时候默认的一个行为,另外还有另外一种形式,呃,另外一种形式大家就想到了,就前面我们讲的这个map跟filter之间呢,这个其实应该算什么呀,就是呃,这里边这个模式其实取决于算子的种类和它的这个并行度的调整,啊刚才我们说的,呃,这个map跟filter大家可能会想到了,那你如果说啊,我们这里边的这个map map跟filter没那么没那么复杂,没有这个并行度的转换。
16:14
并行路的调整,如果只是在这里边,前面有两个map任务,后边有两个filter,诶那大家想这个怎么做做调整呢?这里边是不是相当于都是一对一转换呀,对吧?你这里来的每一个元素做完map之后,然后你就做一个filter就完了吗?这个filter是不是不涉及到重分区,类似于杀后这样的操作,对吧?哎,那么在这个过程当中,你就没有必要再让他轮巡去分配了,大家想是不是我就近直接给一个比方说我自己当前slot里边之前不是可以共享吗?我直接就用自己内部的,这个是不就完事了呢?哎,所以这里边的大家看到啊,在这种模式下,它的这种数据传输是什么?我就不要再去做重新分配重分区的操作了,我直接就一对一传递过来,完事。
17:03
所以这种。数据传输的方式啊,就叫做one to one啊,那这种one toone的方式呢,其实就会维护着分区和元素之间的顺序对吧?咱们就按照顺序一个一个来的,然后做完一步操作,直接传到下一个里边去,我们所说的map filter Fla map这些都是one twoone的,它就有点类似于我们讲Spark里边的窄依赖啊,这就相当于联系起来看了啊,其实大数据分布式系统里面这些概念都是相通的啊,那同样对应的我们说还有另外一种情况,那就是要啊这个分区会发生改变,对吧?这这就是所谓的redistribu redistributing有两种情况,一种就是前面我们说的,本来这个算子啊,它的种类应该是one to one的操作,但是呢,并行度调了,并行度调了那就得重分区了,对吧,要不然你那个并行度利用不起来嘛,然后另外还有一种场景是前后发生的两步计算操作呢,它的这一个本身对于数据。
18:05
对的,传输是有要求的,比如说前面我们给大家讲的map完了之后呢,我要分组去做sum,分组做计算,中间我是不是有一步KBY操作啊,大家还记得吧。我们说这个KBY,它并不是一部具体的计算操作,不是一个任务,但是呢,它是一个数据传输操作,对吧?它定义了我们接下来是按照K的那个哈希code去做重分区,嗯,那所以这里边假如说啊,我即使当前的这个并行度前后都是二。那是不是相当于还是会出现我这里边按照这个分区啊,假如说我这里边AC de啊,还是这样,来了之后我算它的那个哈希值,然后再去取模,那就有可能这个就没准儿了啊,就有可能A来了之后,算完了之后我是分配到这儿了。然后。
19:00
这个C来了之后呢,有可能也是分配到这儿了,对吧,然后D来了之后呢,我是分配到上面这这个并行的这个子任务了,然后E来了之后呢,又分配到下面了,这就没准。这是完全按照我们这个基于哈希扣的去做了一个重分区的调整。所以在这种模式下,这是由于我们前后的两个不同的算子,它们中间的数据传输模式就已经定义了,我们不能做one toone的直传,而是必须要做重分区,那至于这个重分区,你到底是这个前面我们说的这个redistributing啊,就是reb那个,那个有一个专门的细化的一个说法,叫做reb重平衡,对吧,就是轮询的那种方式去做reb,还是说基于哈code去做重分区,这就看我们具体定义的这个操作了。啊,那所以这里边的这种这些操作都会引起redistribute这样的重分区,这个过程有点类似于Spark的杀戮过程。
20:01
啊,那所以对于这样的算子前后的这种算子,我们大家会发现这就类似于Spark里边的宽依赖嘛,啊,这是相当于是类似。所以我们就发现了redistributing redistributing这种操作在什么时候会发生呢?两种情况,一种是前后的这个算子之间,它是宽依赖对吧,就是我们所说的它们互相之间涉及到了重分区操作的时候它要redistribu,另外还有一种就是并行度不同的话,它一定会那素BOT,对吧,一定会涉及到一个重分区的过程。那接下来我们就。就是有了这个概念,接下来我们就解决了前面的那个问题,那就是说为什么有些情况下可以把有一些任务合并起来呢?哎,这就提出了一个概念,叫做任务链或者叫算子链啊,这个operator train,那么flink里边提供了一个叫做operator train的一个优化技术,它的做法就是说,就是满足某种条件下,我们就可以把一个两个或者多个这个,呃,算子这个任务前后连接,先后发生的这个算子任务啊合并在一起,合并成一个大的任务,然后就放在一起当成一个任务去做处理计算了。
21:17
那大想想它的这个好处是什么呢?好处非常简单,那就是我们现在做的这个操作,是不是就相当于成了一个本地调用了呀?啊大家想一想前面我们做的这个操作,你看我把后面的这这两个直接合在一起了,对吧?合在一起之后,它俩之间的这个操作,你想我们我们之前本来说的这个就是还有一个中间传输的一个过程,对吧?你不同的任务之间,假如说跨了slot的话,那有可能还要做这个序列化,反序列化对吧?你跨了task manager,还有网络传输的这个成本,你现在如果合合并在一起的话,没有任何成本啊。直接就是一个本地调用对吧,当前是一个任务嘛,就是一个本地调用,相当于就是连续执行了一段代码而已,这个就极大的节省了我们在做这个数据传输的时候。
22:08
呃,耗费的这个系统资源啊,整个的这个性能也会极大的提升啊,这是关于这个任务链的一个设置,那么什么样的情况下可以合并呢?还是没说这个啊,这里边有要求,就是并行度相同的one toone前后发生的这个操作可以合并在一起,所以这里边有两个条件,一个是并行度相同,然后另外一个是one to one操作,所以说白了是什么?这就是我们前面说的,你假如并行度不同,或者说你不是one to one操作,是那种宽依赖,对吧,那是不是最后都会发生redistributing啊,啊,也就是说只要不发生redistributing,它是这个直通的这种传传递方式,我就可以把它合并在一起,当成一个本地调用来用。这就是我们所谓的这个合并任务链的过程。那具体来看的话,大家看到就是这一步操作,我们直接就在前面生成job graph的时候,直接就合在一起了,因为这个很容易判断,对吧,在我们那个客户客户端提交任务的时候,就已经可以把它合在一起了,如果我们判断大家看这这个例子里边啊这个操作。
23:16
为什么这里边后边两个可以合在一起呢?因为啊,我们做了这个聚合之后,后边在做think输出的时候,这是一个forward,一个one to one的传输,对吧,直传的一个传输,然后呢,它的并行度还都是二是一样的并行度可以合并任务链,可以合在一起。那前面我们看这个source和flat map之间,哎,这个这个为什么不能做这个任务链合并呢?因为它有并行度的调整,对吧?并行度从一变成了二,它这里边有一个主balance的过程,所以不能合并,那后边这里边Fla mapb跟后边我们做这个聚合啊,大家看这个KBY之后做聚合这一步操作为什么并行都是二,为什么不能合并呢?因为有KBY对吧?啊,这是基于哈西克做的重分区了,这不是完图案操作,哎,所以它也不能合并。
24:06
所以最后合并之后你会发现啊,本来我们认为是四步任务,然后分开之后应该是你看前面第一个这是并行度是一,后面并行度是二,总共应该是七个任务,合并之后呢,总共就是五个任务了。好,所以大家通过这个例子也就可以解决我们之前给大家提出的这个问题啊,当时我们设并行度是二的时候,为什么最终合起来是六个任务呢?啊,那就是说前面我们看到socket这里边圆,这里边并行又是一,跟后边的这个并,并行度是二,并行度调整了re balanceance对吧?哎,这里边大家看这里边传输re balanceance,它不能合并,这必须分开,然后中间Fla map filter map,大家看是不是one to one操作,而且并行度相同,都是二合并在一起,对吧,它变成了一个大任务,然后最后还有这个aregate,就这里中间我们做了哈希,呃,做了这个KBY对吧,基于哈has code的重分区,所以这里边也不能合并分开。而最后一步操作呢,是因为并行度又调整了,Think的时候,Print我们设了并行度是一,所以这里边又是re balance又得分开。
25:14
最后就分成了四个大的操作步骤,然后具体按照并行度分开的话,就是六个并行的子任务。然后最后我们按照最大的并行度去分配slot,只要有两个slot就能把它跑起来,我们之前提出的问题就全部解决了。啊,这就是这一部分内容。
我来说两句