00:00
现在大家已经有了slot和对应的这个slock共享组的概念了,那接下来我们就再给大家举一些具体的例子,看一看在整个flink任务啊,并行子任务在做分配的时候,它到底跟slo关系又是什么样啊,另外就是跟这个并行度他们的关联又是什么样的。首先大家看一下这左边的这一个图啊,这是一个drop graph,那大家知道这个job graph,这不就是一开始我们从代码里边,根据我们代码里边每一步操作啊,流式处理的结构,然后就直接可以画出来的一个D嘛,啊,所以大家看一下这个D是什么样子啊,这里边我没有具体的操作,就用abcde来代替了。那大家看一下当前我这个当前这个程序这有个什么特点呢。它的特点是它是有两个对应的这个这个分叉对吧,那大家想是不是这代表什么。这是不是代表我有两条流的输入啊,然后接下来是不是两条流还要合并在一起,然后再去做一个操作啊,所以这是一个比较特殊的啊,两条流要合并的一个操作,那接下来我们看一下第一条流这个A,它读取数据的这个并行度是。
01:15
大家注意这是四对吧?所以接下来如果我要分配任务的话,A是不是就应该有对应的四个任务啊,哎,那然后这个B是不是同样也是A,读进来之后下一步操作是B对吧?啊,那B这个也有四个人物并都是四,那与之对应的这个另外一条流C呢?哎,它的并行度只有二对吧?哎,所以它其实就有两个并行的C任务就可以了,然后他们合流合并在一起之后做操作的那个D。也是并行度是四,那最后一步E啊,比方说这个是输出了,它的并行度是二,那大家想一下,当前这个任务最终一共有拆开一共有多少个任务,然后需要多少个slot执行呢?
02:01
哎,拆开有多少个任务,这个就多了是吧?拆开的并行子任务是不是全部加起来啊,对吧?二这就是三个四,然后二加二又是四四个46个任务对不对?16个任务总共需要多少个lo呢?是16个吗?哎,六个是吗?哎,为什么是需要六个呢?啊,大家可能想到,诶,那那你这里边是不是就是两条流嘛,所以说这个需要四个,这个需要两个是吗?是这样需要六个吗?大家注意啊,其实只需要四个就对了,因为什么呢?因为如果要划分不同的slot,分开不同的slot的话,是不是只有一种情况,就是划分了不同的slot共享组的时候,才不能做slo共享啊,那我们说过这个两条流,不同的流必须要划分不同的slot吗?没说对不对,对于flink任务而言,大家想这是不是我根本不管它到底是两条流还是一条流,这是不是就是一个就是一个任务啊,就是dag里边的一个一步操作对吧?那所以大家看C和A是不是也可以共享一个slot呀,因为大家知道这其实就是不同的线程嘛,每一个任务是不是都是不同的线程啊,共享slot只是不同的线程在抢占这个slot资源,对吧?啊就是呃,当前空闲的时候,我就可以有有其他的任务就可以执行了,所以那是不是我可以当前这个lo闲的时候,我读取一个这个A这个这条流里的数据,然后闲了之后我又读取一个C这条流里的数据,你说行不行啊?
03:36
完全可以,对不对啊,这个操作是完全可以执行的啊啊,那当然大家知道A这里的数据读取完了之后,是不是接下来就是做B操作啊,那他就可以直接在当前这个分区内,因为你看这个A1字排开有四个并行子任务,B是不是也是一字排开有四个并行子任务啊,所以这一步操作我就不用做数据传输,直接给到自己这个分区的B就可以了,对吧。
04:02
那如果要是C读进来之后呢?啊,那大家会发现它接下来是不是涉及到要跟A做一个河流,那是不是当前C进来的数据就有可能要发送到不同的分区,下一步的那个D子任务里面去啊,那所以大家看到就是假如我读进一个这个第二条流啊,C这里边读进来一个数据的话,做完之后就有可能要跨slot甚至跨task manager去做一个数据传输,对吧?啊这里边多个箭头,大家注意并不是说一个数据完了之后就要广播啊,这里边说的是这个任务发送数据的话,有可能要给多个不同的这个D的子任务发送数据,对吧?啊,它是这样的一个含义,所以大家看这就是这样的一个分配的过程,所以最终只需要。四个slot是不是就执行起来了?哎,所以这里边就有一个概念,之前我们不是说到整个流处理程序的并行度,用什么来指代它的并行度呢?啊,就是一般情况下啊,就认为的就是所有算子里边最大的并行度,为什么呢?
05:11
是不是就是算子里边最大的并行度就是代表了我们当前需要的那个slot数量啊。也就是我们当前同一时间内正在并行执行的线程数,大家看是不是这样啊,所以我是不是就可以把这个作为当前整个流处理程序的一个并行度,大家看这个概念是不是就都串起来了,对吧?啊,Slot的概念啊,然后这个任务分配的概念,然后还有这个多线程的概念,还有这个并行度的概念,这就完完全都串在一起了,这就是关于这个并行子任务的一个分配的实例,然后接下来再给大家讲一个,呃,就是具体的操作啊,从这个任务提交跟这个集群环境配置啊都相关,然后我们看一看对应的这个任务到底是怎么分配的。
06:00
大家看一下这个图,首先呢,我们先看一下当前这个集群里边有几个task manager。这是有基本配置的,对吧,有三个task manager,每个task manager里边呢,诶是有三个task slot,这个是根据配置文件配置的,对不对,大家看这来源是在flink com yamo里边task manager.number of task slots配了三,所以这里边每一个都有三个啊,大家看推荐的这个值就是。对,CPU的核心数量对吧?啊,当然有同学可能在自己测试的时候发现,呃,在做测试的时候,那个我我默认执行的时候好像跟自己的核心数量不一样,那那这个大家要稍微注意一下啊,你要看自己的那个本身逻辑的,就是CPU的那个逻辑核心数到底是多少对吧?啊,就有可能你的那个是呃,六个六个核啊,但是实际上有12个逻辑核心对吧?呃,那所以这个时候你可能它默认的并行度就是12啊啊那所以大家看现在我总共有多少个可用的slot呢?当然就是三三得九对吧,这是这个初始的状态。
07:09
好,那现在接下来我写了一个work work程序啊,然后做了一个提交这个word count程序呢,在代码里边没有做任何的并行度配置。然后也没有在提交的过程当中,也没有指定任何的并行度,那所以现在我的并行度应该以谁为准。是不是就是配置文件里边的那个派出来怎么点default啊啊,那这个比方说我默认它是一等于一,那最后的结果怎么样。大家看啊,比方说我这里边有主要有三步操作啊,啊前面比方说source进来之后做了一个flat map,对吧,后边又做了一个reduce啊,最后做了一个THINK3步操作三个任务,哎,最后是不是就相当于只占了一个槽位啊,只放到了一个slot上,因为默认有slot共享对不对?哎,对吧,现在并行度是一,然后这个前后发生的三个任务啊,他们都可以共享一个slot,那最终是不是就只占了一个槽位。
08:09
所以大家看这个是不是资源很浪费啊,你如果要直接这么配置的话,我的这个机器资源啊,这里边的这个对应的计算资源都没有利用起来,所以那如果我要充分利用这个资源的话,可以把并行度调大一点,是不是就可以利用了,哎,所以比方说现在啊,我把并行度设成二,这里就有不同的设置方式方式了,首先我可以把这个配置文件里边直接默认的并行图改成二,但是大家会想到这个你如果要改的话。对,那是不是所有都改啊,而且是不是还得重启集群啊,对吧,就是集群里边所有提交的那个job啊,默认平行度都要改啊,所以这个其实是就是最底层啊,所以我们说它的优先级也是最低的,那另外还可以怎么样呢?是不是还可以在提交fli drop的时候,Flink drop的时候给一个杠P参数啊,对吧?诶给一个这个杠P参数,这个就可以指定当前提交的这个drop默认的并行度是多少对吧?啊,比方说这个默认并度是二,可以设置,另外还有一种操作是哎,直接在代码里边全局Env.set paraism2啊,那这是不是就是把我当前这个drop这个代码啊,在代码里面就写死了,病毒必须是二对吧?那他们的优先级分别是什么呢?
09:28
对,如果他们都做了设置的话,首先我看的是代码里边的配置对吧,如果代码里边没配置的话,对看提交时候的这个配置,如果提交的时候也没有参数的话,诶,那看这个集群的默认对吧,而且大家知道就是集群的这个默认是对当前集集群里边提交的所有job都生效的啊,当前这个环境配置和这个提交的时候的这个参数的话,这是只针对当前job对吧?然后另外就是如果你在代码里面写死的话,这个是不是必须如果想更改的话,就必须再重新编译打包,再提交这个抓包才能够重新重新提交才能运行啊,而如果说你这这里边如果是这个提交的时候给参数的话,是不是直接用之前那个抓包就可以了啊,所以这个就是使用的时候灵活度也会有也也会有一些区别啊啊,那这个设置好之后,并行度是二。
10:25
那大家想现在每一个任务并行度都是二,最后是不是有六个并行的子任务啊,对吧,都拆开有六个任务,那占用几个slot呢?对,大家想是不是只占用两个啊,最大并行度吗?因为他们是不是可以共享slot,前后发生的大家看三个啊,前后发生的这三个任务都共享到占据一个slot,最后就只用两个slot。那什么情况下,就是当前这个这种场景啊,我怎么样配置可以让资源利用率最高呢?哎,那大家自然想到,那是不是把这个lo占满。
11:03
并行度设的更大,是不是就资源利用率更高一点啊,哎,所以是不是直接把这个并行度设九当前,是不是就可以把这个资源完全就是利用起来啊,这样的情形大家就想到了,它就是27个任务一字,呃,这不是一字排开啊,就是每三个任务占据一个slot,然后占满所有的slot对吧,Slot共享啊,这就是最终执行的这个状态,当然了,每一个slot具体在执行的过程当中,因为我们还是这个只能执行一个线程嘛。同时是不是只能有一个线程执行啊,对吧?你有可能是在做前面的那个source读取读取数据啊,那也也有可能是在做这个后面reduce,但是你不可能同时发生,对吧?啊就是你在做完一个再做,再做下一个啊,啊那大家会想到实际执行的过程当中,可能还会有其他的一些考量,比方说什么呢?啊,就是假如说我最后这个think啊,啊,我们是直接控制来打印,那有可能我这里边想要一个什么呢?我写入到一个文件里面去,那大家看这里边我们代码写的就是right as csv,对吧?啊有这样的一个写法,那假如说我直接把它写到文件里边的话,就会有一个问题,这里边如果我THINK9个任务去写入到文件里面去的话,那写入之后应该是什么状态啊,对,首先不同的task manager写的文件是不是应该应该是不同的文件啊,首先是。
12:28
除非我给一个远程的,比方说给一个HTFS的一个目录,对吧?啊,首先是有这样的一个考量,然后另外就是在同一个task manager里边不同的线程去同时写一个这个这个文件,是不是这就会出现这个顺序就会就会很混乱啊,另外甚至还会涉及到一个就是我不同的线程同时要去写入的时候,是不是有有可能会出现问题啊。对吧,有可能把之前的那个还没写完的,然后后面那个就直接把它覆盖了,诶有可能会出现这样的一个问题,所以这个就导致有些场景下,我是不是希望让他能够保证写入的顺序,而且要保证它最终不出问题,不出错啊,那所以有一个非常简单的操作,就是对我是不是可以把它的并行度设成设低一点啊,直接设成一是不是接下来我就是最终只在一个文件里边按照顺序一个一个写入了啊,所以这个就是我们在代码里边可以针对每一个具体的操作,每一个具体的算子后边去调set parallelism方法,对吧?这就不是在全局ENV去设置了,而是只针对当前的这个写入right s c SV设置,对吧?那大家看现在他设了这个一之后,最后我们的效果一共的任务是不是就变成了。
13:45
别的并行度还是九,那是不是就相当于二九十八,然后再加上一个,一共有19个任务啊,最终占据多少个slo呢?还是九个最大并行度对吧?哎,那最终的效果就是只有一个slot里边是包含了所有的三步任务,因为thinkk只能有一个嘛,那另外其他的八个slo是不是只有前两步操作啊,对吧?最终的那个SK是全部要汇集到这一个slo上来做输出的。
14:16
啊,这就是我们具体的这个例子啊,关于并行度和slot以及任务分配之间的这些关系。
我来说两句