00:00
我们现在大家已经清楚了运行时弗link在运行时的组件,还有任务提交,任务调度的原理,呃,知道了它里边的很多细节,然后我们也解决了主要的那几个问题,我们知道了flink里边任务到底是怎么并行的,然后我们怎么样确定一个flink程序里边到底包含多少个最后要执行的任务,我们也可以知道最终我们需要多少个slo去执行它,对吧?然后在这里边其实最主要的问题啊,大家会发现我们最后这个需要去了解的就一个是可以合并任务链对吧?可以合并这个operator train,在符合两种条件one to one操作,而且是并且都相同的条件下,前后的任务可以合在一起,这是一个要点。然后另外还有一个就是我们说的可以做slot共享对吧?哎,当前这个所有的这个slot,你要判断它占用多少个这个任务,占用多少个slot资源,其。
01:00
这是最后只要看当前并行度最大的那个就可以了,这里边就会涉及到两个问题,当时我们给大家解释的时候呢,一个说这个合并任务链,这个好处非常明显,对吧,你这个是one two one操作,那那如果说呃,本来就没有必要嘛,对吧,他就是做完一步直接做下一步就可以了,所以我们合并在一起的话,可以节省很多网,这个数据传输的时候,网络的开销啊,序列化反序列化的这个开销,这些就都省下来了,延迟就更低了,这是一部优化,所以弗林克默认是这么做的。那大家可能会想到你如果说我们这这种合并啊,在这种场景下,这里边只是非常简单的三步,如果我们当前的这个任务非常复杂,如果说前后发生了很多步操作,但是呢,他们刚好都符合one two one,而且是并行都相同这个要求。里边呢,有某几部特别特别复杂,里边的计算特别的繁琐,所以在这种场景下,我就会考虑,哎呀,我当前的这一个任务是不是有点太大了呢?
02:08
对吧,大家之前就想过,我们之前那个对于这个呃任务去做调整的时候,我们让他可以做这个slo共享的时候,就考虑是有一些任务,他可能这个CPU密集非常的耗费资源,对吧,我们希望是他能跟其他任务做一个呃协调的工作,所以给他做了这个lo共享,那比方说现在我们是把这个所有的任务是直接合在一起了,但是有时候我可能想到什么呢?我我希望想要把它再拆开啊,当前的这个任务可能太大了,它就只能放在一个slot里面执行。我现在希望让他能拆开,能分配到不同的SC里面去执行,这种场景下我怎么去定义呢?哎,这里边就涉及到了另外的一个所谓的做这个呃,任务调度的一种方法啊,当然就是在flink里边,我们说默认是有一些呃配置的一些操作的啊,就比方说像前面说你针对。
03:11
当前的这个任务到底是分配到优先分配到哪个task manager的这个slot里边啊,这我们说默认情况下,它其实就是只要是空闲的对吧,只要可用的直接就分配,类似于一个随机的分配方式,那其他的策略你也可以去配啊,但这个东西一般在用的时候比较比较少叭较少少见,那更常见的是什么呢?就是我要把这个拆开对吧,当前我合并了任务链,但是我现在不想。让它们全合在一起,我把里边有一个特别复杂的操作,我要把它择出来。这怎么办呢?啊,这里边flink里边给我们提供了这个就是可选的一个方法,叫做大家可以看到有一个方法叫做disable training啊,就是假如说我在某一步操作后边直接点disable training的话,它的含义就是说,诶,当前的这个filter啊,这个任务直接把它摘出来,它前后就是它禁止去合并任务链,对吧,这个含义是,所以如果我加了这个,那就是它前面也要断开,后边也要断开。
04:19
啊,大家想一想,如果说我这里边filter这里给了一个disable training的话,如果我这里边不做打包了啊,大家就直接做一个思考就可以了,下去之后大家可以自己做测试,如果我把filter摘出来,那我当前应该拆成多少个任务呢?啊,大家就发现了,前边这里边这个S还是S对吧?哎,关键就是中间这里边本来是一个任务,然后并行之后是两个任务,他就要拆开了,拆成什么呢?Filter是filter。对吧,然后前边的flat map是flat map,后边的map是map,哎,这就真的完全的拆开了,这是flat map,这是这个future,这是map。
05:04
完全拆开,因为我们要求是filter不参与任务链的合并,前后都得断开,所以现在一个变三个对吧,那两个就变六个了啊,然后再加上其他的四个,那总共就得有十个任务了,对吧?啊,大家下去之后可以自己去测试一下啊,可以做这样的调整。然后另外还有一种情况就是大家可能想到,那假如说我并不想是这个filter前后都拆开,对吧,我可能就是在某个地方,因为就是我当前的这个任务比较多,但是呢,没有某一个就是特别特别复杂,我要把它单独摘出来,只是这个任务太多了,总共这个总量合起来有点大,我想怎么样呢?我想在中间中间某个部位直接把它断开,当前的这个任务链前后还都可以合并,但是呢,我把它一个链想断成两个链,断成两个任务,这可这种情况可不可以呢?也是可以的啊,比方说你假如说说想在这个filter和map这里边断开的话,那怎么办呢?那就在map这步操作的时候,有一个操作叫做有一个方法叫做start new,我可以直接开启一个新的任务链。
06:15
言下之意就是之前就断开,对吧,之前该怎么合并,怎么合并我不管,但是它就是之前这个filter和map之间断开了,从map开始合并一条新的任务链,对吧,还是按照这个规则该怎么合并怎么合并。但是如果对于这个例子而言,那就相当于变成了这个filter flat map和filter合并在了一起,这是一个任务,然后后边断开这个map是另外一个任务,对吧?就相当于一个变两个了,那那最后如果我们再拆开的话,最后就应该是八个人务好,大家下来之后,可以把这个不同的情况再好好的去测一下啊,这是关于合并任务链的一个,呃,就是大家可以自定义的一种这种任务调度的方式啊,那另外还涉及到一个就是前面我们讲到这个slot共享啊,因为大家想了,我把这个任务链拆开啊,拆开这个还还还没完,因为大家想到你拆开之后,我最后所有这个不同前后发生的这个,呃,这个任务它不是还可以就是直接做slo共享吗?你如果slot又共享,又在同一个slot里边的话,然后你又拆成了两个,两个任务,两个步骤,何必那么麻烦呢,你直接合在一起不挺好的吗?你这个拆开就没有意义了,对吧?所以我们拆开的这个想法是什么呢?或许我是想要把它单独放在一个独享的slot上面去执行。
07:44
不要跟别的任务混在一起,也就是说,我现在想打破默认的这个共享slot的机制,我想让某些任务直接就独享自己的slot。啊,这个这种情况怎么样去考虑呢。也有这样的方法啊,弗林给了我们这种灵活的方式,就是还有另外一个一个操作,叫做哈呃,叫做呃,大家看到叫做slot sharing group也呃,字面的意思是这是当前的slot共享组,那也就是说然后里边传一个什么参数呢?大家看这里边传一个string类型。
08:22
这个可以随便给对吧,比方说我给叫A组,这个A组表示是什么含义呢?就是前边这个,我不管从当前的这个算子任务啊,就当前的这一步操作开始之后的,假如说之后不定义别的组的话,之后的所有操作都叫做一个lo共享组叫做A都在这一个组里边。也就是跟前边的操作,比方说我们这个SS任务就分开了,他们属于不同的共享组。那这个共享组又是什么含义呢?呃,既然是叫做共享组嘛,意思就是在同一个共享组sharing group之内的所有操作,他们可以共享slot,就这里边的每一步任每步操作啊,最后我们合并任务链之后的那个,呃,那个任务都可以共享slot做做这个处理,但是呢,不同。
09:16
Slot共享组。不同共享组之间的这个这个任务,他们互相之间一定要分配在不同的slot里边。所以这个就相当于是什么,我就相当于把它单独的分配到了不同的slo里了,对吧?啊,那你假如说我这个flat map就这一步操作,我想让它单独放在一个slo里边的话,那可以怎么样呢?诶,那就是比方说前面这里边是一个slo共享组,对吧?比方说我这个slo共享组,这个叫B,然后后边呢,又是一个S料共享组,还叫B,对吧,就是前后都是同一个共享组,就是它中间这个是共享组是A单独摘出来,那就是除他之外别的这些任务都可以共享,但是就它不行,它单独占用slo。
10:06
啊,所以大家要注意一下啊,这种方式会导致一个什么结果,那就是我们做这个呃需要占占用的资源啊,需要占据的这个slot数量的时候,你考察的时候就要呃就要额外去考虑了,为什么呢?因为不同slot组里边的这个呃任务它是会单独占据slot。所以我们最后需要的资源呢,是要按照每一个组分别去统计它里边最大的那个并行度,然后不同的组再叠加,最后才是我们真正需要的资源数量。啊,这也是在呃优呃在在去具体项目啊,在做优化,在做这个任务调度的时候,有可能会用到的一种方法啊,默认情况下的话,我们不需要对吧,因为大家知道lo共享组是有好处的嘛,对吧,我们保持一个这个作业管道,这个可以提高作业的利用率啊,就是我们资源的利用率,其实默认情况下是不需要去更改的,如果你真的需要的话,有这种方式。
11:07
大家稍微做一个了解就可以了,呃,另外我们再补充一下,就是默认情况下,为什么所有的任务都可以都可以共享slot呢?其实就是默认,如果你不给的话,所有任务其实是在同一个共享组里边,这个共享组它的名字那个字符串啊,那个名字就叫做default啊,所以说你自己接下来给那个共享组的时候,你最好不要直接就给default,因为你如果给default的话,它跟你前面啊,前面那个没定义共享组的时候,那个是同一个组,相当于他们还是可以共享的啊,所以这个大家稍微的做一个了解就可以了。然后另外还有一个关于那个切断任务链啊,这里边也有一个操作,就是前面我们指的是每一个算子可以去切断,对吧?那同样我可以全局直接切断这个叫做Env.disable operator training,就是直接全局把这个合并任务链这个操作全部给它静止掉了,也就是接下来就是。
12:07
真的没有任务链这一步,所有的每一步操作对应的都是我们图里边的一个任务,对吧,然后按照并行度把它再拆分开就可以了啊,这个大家可以下来之后都做一下测试。
我来说两句