00:00
在operator chain的这个处理的这个过程当中呢,呃,当时给大家提到了,就是说在这个生成任务执行计划的时候,他其实就按照我们的这个规则,就是我相邻的这个任务,假如说满足这两个需求,一个是。一个是并行度相同,另外一个是,哎,他们之间是one to one操作数据传输的这种方式,那么就可以把它合并成一个operator chain,对吧?好,那大家说这个operator chain有什么好处呢?三二十。简单来说,其实就是说两个合并在一起了,它的好处就是中间不用再去把数据真正做这种意义上任务之间的那种传输了,对不对?大家想它的本身,我们如果任务之间要传递数据的话。因为我们不知道你这个数据要不要发送到其他分区,对吧?所以正常来讲,任务之间传递数据,它必须要有这个通信的话费,而且是不是必须要做序列化,反序列化呀,哎,这个过程其实还是比较耗费资源的,那假如我就知道你不须要,如果我必须要把它发送到另外的一个slo里边,或者我甚至得发送到另外一个task manager上面去,那那我当然必须得走这个网络传输,我要把它序列化,对吧?这个是没办法的,假如我就知道它就是在当前这个task manager,同样的一个slot里边,同一个线程里边,我还需要把它做做这个序列化,然后再传输吗?诶,那就没必要了,对不对?我直接在这里边就相当于不用做任何的序列化操作,而是直接放在内存里边,我直接调用这个对象是不是就完事了呀,就相当于就变成了一个内部的方法调用了啊,所以这个就极大的减小了我们的。
01:52
在执行过程当中的这个话费啊,所以这是一个简单的一种优化方式,那当然大家就可能就会有疑问了,你这里边默认就把这种符合条件的算子都会合并在一起,那它会不会有,会不会有什么问题呢?
02:08
那假如说这里边我们某一个操作啊,假如说我们某一个聚合操作,它就是非常非常费性的,对吧,它本来就是非常非常复杂了,然后你还要把它前前后后很多的算子跟它合并到一块儿,合成一个大任务,那这个大任务是不是就就有一点特别耗费资源的那种感觉啊,在这种情况下,其实我们更想把它再打散拆开,让它可以有更高的并行度,对不对啊,因为你打散了之后,它就可以做这种slot的这种共享了嘛,你如果合在一起的话,那就没办法,它必须是放在同一个slot,同一个任务里面去执行。哎,所以大家就会想到我们在代码里边能不能手动的控制它,到底什么时候做这个operator train什么时候不做呢?哎,当然flink是给我们提供了这样的一个呃能力的,就是我们可以这么去做,这里边给大家说一下,就是怎么样去做呢?首先env里边有一个另外一个方法叫做大家看叫disable operator training,如果我们掉了这个方法的话,它字面上理解就是说不执行或者说禁止做这个算子链任务链,对吧?啊,就是不要做了,那大家想,如果我们把这个。
03:32
调用了之后它的效果是什么样的,我们还是来测试一下吧,这个口说无凭啊,呃,之前的这个状态我们还是去起一个。呃,这样啊,我们先到对应的目录下边去。呃,大家还记得我们的那个启动集群的那种方式,对吧,是不是直接start cluster,对,直接把这个集群先起起来。
04:11
大家看我们已经提起来了一个task manager哦,这个因为我改过配置,所以是三个slot,大家还记得吧,然后下边是这个可用的就是三个,然后我们这里边啊,为了确定我这里边这个代码没有改过,大家看我这里边这个状态对吧,我还是重新打包一下,呃,大家如果要是觉得那个编译比较麻烦的话,如果要去编译的话,我们是点这个啊scla compare,如果大家觉得这个比较烦的话,我们也可以直接运行一下就好了,对不对?呃,我们先编译一下,然后把当前的这个打包一下,大家看一下状态是什么样的。好,我们这个编译好之后再做一下打包,现在我们打包完成之后,就在这个web UI里边去做一个上传提交。
05:00
这个。Upload。上传之后我们就可以把它做一个提交了,我还是我们的入口类用这个啊streaming。好,这里边给一个参数,Host和port。呃,那大家会看到,其实这里面我们不用去真正提交,因为大家记得我们直接收plan是不是就可以看到他的那个执行计划啊,所以这里边我们就不不专门提交了啊,直接收出来看一下吧,诶大家还记得这个图是吧。这说明我们默认的并行度就是是不是都是一啊,所以如果都是一的话,大家看它最后执行的时候是分成了几个任务去执行的呢?按照我们的那个任务链的合成的规则,它是不是就合成了两个大的任务啊,前边所有的从这个SS读入到呃,Flat map filter map,它们中间是不是都是one to one的这种数据传输的模式,所以到这里它是不是全合并成一个啦,然后之后这个aggregation为什么分开了呢?
06:13
哎,我们讲过KBY之后做啊,对,呃,做那个聚合的时候,它是不是要有一个基于哈西扣的重分区的过程啊,所以这个就不属于弯图案了,所以大家看它的这个数据传输是一个哈希的这种方式啊,那这里边是不是就得分开啊,啊,就得是两个任务了,但是它后边聚合之后到thinkk是不是又是一个one two one的一个操作,所以后边它又合成了一个大的一个任务,所以总共就是两个任务,那大家一定还记得,如果这里边我们把它并行度改成二的话,这个plan是什么样的?对,这个plan是不是因为我们最后设置了THINK1定要是一,而且我们前面那个socket是不是它必须只有一个啊,啊,这是socket的特性决定的,所以中间还是做KY聚合的时候必须要分开,然后是不是前后是因为我们那个并行度不同,导致把它做了一个re balance啊,那大家看是做了一个这样的不是弯图案的操作了,所以任务从这里边要把它重新再切开,所以大家看就切成了四个任务,对吧?哎,是这样的一个方式,这个大家有印象对不对,这是昨天已经给大家讲过的并行度的这个相关的一些东西啊好,现在大家看还是我们看这个并行度是一的时候,这种特性,我们现在要干什么呢?大家记住这个图啊,就两两个大的任务对不对?我们现在。
07:42
加入这么一句disable operator training。这相当于就是说。把我们整个的这个任务链合并的这种方式是不是全部禁止掉了,禁掉了大家猜一下这个出来的plan应该是什么样的?好,我直接先把这里先编译一下啊。
08:02
啊,有同学可能已经想到了,是不是,那就应该是根本没有合并的这种情况出现,假如根本没有合并的情况出现,那是不是应该对我们这里边一个算子,是不是就应该对应的一个方框,一个任务啊,哎,那大家猜一下应该有几个任务。哦,那大家看这里边你不是都已经有这个箭头表示出来了吗?呃,这箭头是不是表示他们之间做了一个operator train的一个连接操作啊,啊,那我们到时候就把这个箭头全打开,那就应该是从socket这个S开始啊,是不是就应该这前面是四个,后边是两个,是不是一共应该有六个任务啊好,我们看一下这个是不是符合我们的预期package一下。再看这里边已经打包完毕,我们把它重新提交一下。好,那大家看我这里面是不是可以直接重新上传这个文件啊,而且上传之后这个文件跟之前一样吗?大家想诶,大家看是不是他又有了一个,尽管叫同样名字,但是它是分别管理的对吧?哎,所以。
09:10
这个时间戳是不是我们刚刚上传的,所以接下来我们把这个。给一下啊。哎,大家想一下,这个收plan生成的是不是就符合我们的预期,就是完全打散的六个这样的一个任务啊,所以大家看前面除了中间这个KBY这里边是基于哈希扣做了一个重分区,其他的地方是什么呀?是不是都是forward啊,One to one的操作对吧?好,呃,这是我们给大家讲到的,就是可以在这里边做这个,呃,Disableer training这样的一个操作啊。那有些同学可能会想到,我必须是全局的把它全打散吗?我可以去单独的定制化,说把中间的某一个算子让它摘出来,把它跟周围的断开连接,把它打散,其他该合并还合并可以吗?哎,当然也可以,这个disable也可以,比方说我们这个filter啊。
10:14
大家看它是不是有一个disabled training啊,呃,跟前面那个不一样,那个叫disable operator training,现在直接就叫disable training,我直接如果写一个able training的话,大家猜一下会发生什么样情况?我先compare一下。会变成什么样,我把这个filter前面的那个没有打散了,对吧,该合并还合并,直接就把这个filter这里加了一个disable training,最后应该是几个任务。三个啊,有同学说三个,有同学说四个,还有同学说五个,到到底是几个?呃,好,等一下我们就来揭示答案啊,我直接打包。
11:03
包完毕,我们看一下是不是符合我们分析的这个预期,还是啊,直接去把它艾特进来。这里面我就不改名了,大家如果想看的更清楚一些,可以改个名儿,对不对啊,这个是代表什么,哪个版本的啊。哦,这里边我必须得给参数啊。好,大家看一下是不是非常符合我们的预期啊,我们我们觉得他应该是四个啊,确实他这个得出来的这个执行计划就是四,前边source跟flat map放在一起,Filter单独是一个,诶,Map是一个,后边的这个aggregation到think又是一个。哎,那有同学可能说了,你这个看起来很奇怪的样子,就是我我单独把这个filter就单独摘出来了,对吧,那假如说我的想法是什么呢?我们直接看这个第二个,第二个大家知道是对,就是六个的完全打散的那种情况啊。
12:17
就假如说我现在的需求是什么呢。我不是想把这个filter单独摘出来,我的想法是我就只想在它前面把这个把它打断就可以了,后边该合并还合并在一起,这个可以做到吗?就假如说我想把这个合并成,比方说前边这是一个,中间它俩是一个,因为这必须得打断,对不对,这个我们没办法合,这个必须得打断,然后后边是一个,我我把它合并成三个任务,可以做得到吗?有点定制化的感觉了是吧?可以做到吗?可以做到怎么做呢?这里边给大家提另外一个方法啊,不光可以disable training,还可以干什么呢?还可以大家看有一个方法叫start new,开始一个新的练,对吧?那大家可能知道开始一个新的链是什么意思呢?是不是就是强行把它前面跟后边断开对不对啊,然后前后分别就是两条链,那至于两条链内部是不是你该合并继续合并啊,那其实就实现了我们这个方法,那大家看,如果我从filter开始强行开始一个新的链,是不是相当于就从前面直接把它断开了?
13:35
好,我如果加了这个start start new,如果把它加出来的话,我们直接还是编译打包一下,大家看一下这个效果是什么样的。我们打包完成还是看一下。应该是一个什么样子啊。还是把这个再做一个上传。上去。呃,我们还是用这个时间戳做区分啊,这是刚刚上传的,看一眼。
14:06
7777。诶,大家看见证奇迹的时候是吧?哎,我们单独把它开启了一个new chi,得到的结果就是从这里是不是就把它断开了,Filter和Fla map之间断开了,对吧?从这里开始,后面开启了一个新的啊,这个operator chain,那后面的这个操作还是正常的,所以就变成了三个任务啊,就是这样的一个状态,大家下去之后可以按照这种方式好好的做一些练习,做一些测试。
我来说两句