00:00
上节课我们已经给大家简单的说了整个在flink里边任务调度的一个原理,那这里边我们会看到,其实具体来说的话,还有很多细节我们还没有完整的说明白,比方说我们之前提到的这个pass manager里边的slot到底是怎么去完成任务的啊,还有就是我们整个那个执行图,它有到底是做了哪些优化,到底是做了哪些调整呢?这些部分都是我们要去考虑的问题,所以这节课我们继续来给大家把一些概念做一个细化的分析,首先我们来给大家详细的说一说sports啊,呃,那大家可能会知道,就是在这个flink里边,我们讲了每一个task manager,他是干活的人嘛,他其实就是一个GVM进程,对吧?啊,之前我们给大家说过的,它就是一个干活的进程。那对应的在干活的过程当中,它有可能怎么干呢?哎,一个进程是不是还有可能做这个多线程处理啊,哎,所以它可能会在不同的独立的线程上去执行一个或者多个他自己的子任务,而这些一个或者多个这个不同的子任务,它到底能执行多少个呢?为了控制它能够运行子任务,并行运行的子任务的这个个数,那么就提出了这样的一个task manager slots的一个概念,就所谓插槽的一个概念。
01:32
那这个。这个slot相当于就是就是task manager上面有固定大,就是固定大小的一组资源的一个一个集合,相当于是这样的一个定义,所以我们说在整个flink集群里边呢,做任务调,做这个资源分配的时候,Resource manager做资源分配的时候,做资源管理的时候,最小的单位其实就是slot,它就是按照有多少个slot来去分配的啊,那那大家会想到那这个slot我们又是怎么去定义的呢?我们当然就是在配置文件里边提前配好的,对不对,或者是在雅安模式下面,我们也是这个创建雅安30人的时候,就可以指定好每一个他manager到底是多少个slot,这个都是我们提前配好的,那这个我们又到底是按照什么东西去配呢?
02:25
呃,一般的方式,其实我们其实就是要按照这个每一个task manager这个机器的性能,它所含有的资源来去配置它的这个slot,因为slot相当于是什么。相当于就是它所有资源的一个子集,这个子集在运行的过程当中呢,就执行一个隔离开的独立的一个子任务,一个线程,对吧,一个独立的线程,所以它其实就是相当于用这个slot把不同的子任务之间做了一个隔离。
03:02
那假如说我们task manager,一个task manager就只设定一个lo可不可以啊,当然是可以的,大家看我一开始的那个默认配置就是一嘛,那这个就代表什么呢?这个其实就代表就是我们一个task manager这个GVM进程里边只有单独的一个线程,对吧?啊,它相当于独占了我们整个的这个呃,Gbm啊,所以这其实就是呃一个一个这种资源分配的一种模式,如果我们觉得诶,它本身资源很丰富啊,它内存很大,CPU数量也很多,对吧?那这种情况我们是不是可以让他同时并行执行更多的任务啊啊,所以在这种情况下,我们就可以考虑对它做一个划分。大家这里要注意啊,在这个slot slot这个划分里边,它其实主要隔离的是什么呢?隔离的就是内存,对这里大家注意CPU本身它是不做隔离的,CPU不同的料之间是可以去共享的,所以大家会发现我比方说啊,我的电脑是四核。
04:12
在这个,呃,我的这个开发环境里边,默认的并行度是四对不对,我如果设八可以吗。啊,其实是可以的,完全可以啊,大家下去之后可以去做这个测试,完全是可以的,它其实就是不同的lo会共享这个G边,呃共享这个CPU,但是内存绝对是隔离开的,所以大家在划分的时候,那你就要去考虑了,呃,我这个内存它是平均分配对不对?直接比方说我这个呃16G内存,如果划分四个lo的话,那一个就是4G,那假如说你这里边这个对它这个划分的这个这个呃本本身的这个东西划分的太小,每一个里边的这个内存占用的资源太少的话,就有可能会出现什么情况。
05:01
务执行任务的时候执行不下去了,对不对啊,直接把把这个呃内存直接撑爆了,这个是完全有可能的啊,所以这个大家是需要去做这方面的考量的,你根据我要执行的任务的这个复杂程度,占用资源的程度,和我本身这台机器它本身所有的资源的这个数量大小,做一个整体的一个调配,做一个划分,确定我这个task manager到底分成几个slot啊,这是大家常见的一个方式啊呃,因为一般情况我们这个内存呢,正常来讲一般是够用的,对吧?现在我们大部分这个机器其实内存还好,那所以为了避免我们这个呃,CPU不同的号之间它还要去共享CPU,导致我们这个可能资源本身这个复用的程度不够高,那我们可以怎么干呢?啊,就是往往就是可以按照CPU的核心数量。然后去划分这个lo个数,所以大家看本身这个idea里边,这个开发环境里边,它默认的并行度就是按照核心数量来定的,对不对啊,所以大家这个也可以作为一个参考啊,大家看一下这个例子吧。
06:14
呃,大家大家看我们这里面这个例子一共有几个任务啊。大家看一下。这呃,首先我们看到是有两个task manager对吧?干活的task manager有两个,每个task manager里边有三个slot对有三个插槽,所以它总共就有六个插槽对吧?呃,那大家能想到它最大支持的这种就是并行执行的任务应该是几个对大家可以想到就是同时支持的,肯定就是能能同时支持六个任务并行啊那大家会想到,哎,这个任务在我们的这个例子里边一共是几个任务呢?他其实任务是只有三个对吧,因为大家看到是不是前面这个s map这个合成一个,这是一个任务,但是这个任务有两个并行的子任务对不对,所以如果我们要细分,细分到这个并行任务里边的话,那相当于它就变成两个了,然后同样我们后面这一步k by window apply这个结合起来的这个任务呢,它也有并行的两个子任务,所以总共我们并行。
07:27
分配之后的任务是不是一共有五个啊啊,所以大家会想到就是说如果我们想呃,充分的把把所有的这个任务都并行去执行的话,那是不是哎,现在我们这个六个slot这个资源是不是完全够用啊,对吧,完全够用,所以我可以把它完全就排开,就这么排开,那大家可能会想到,那是不是说我这里边一共有多少个,就算上这个并行的子任务之后,一共有多少个子任务,我这里边的这个slot就就一定至少要多少个呢?
08:03
那假如说我这里边这个情情景啊,假如说我只有一个task manager,是不是这一个,呃,就是当前我们的这个这个任务就完全不能执行了呢,是不是资源就不够呢。哎,好,那所以接下来我们可以试着调大一下并行度,比方说大家会知道我这里边相当于设置的这个,大家看think的并行度是一,前面两个,前面所有的内容是不是并行度都是二啊,对吧,相当于这样,现在我们把前面的并行度调大,比方说前面从二调整到六。那大家会想到接下来我们所有的并行的,算上并行的子任务一共有多少个?假如我们把前面这部think还是一,对吧,这个这个我没说变就就变成这这里边还是一,然后前面的这些并行度都变成六,那总共应该有多少个。
09:07
啊,有同学已经想到了,那这一个任务source和map合起来是一个任务对吧?并行度是六,是不是就分了六个子任务啊,然后后面这一个是不是也是这是一个任务,分成了六个子任务啊,哎,所以加起来是不是12个,然后再还有一个think一个一个任务,并且度是一,是不是最后就是13个任务啊,那大家想我们那那现在的这个状态资源是不是就不够了呢?能执行吗?是不是必须要有13个lo才能满足我们这个这个需求呢,这个并行度的需求呢。这里面给大家说一下,大家看一下这个图啊,其实是不需要。为什么呢?因为大家其实可以发现啊,就是这里边每一个算子啊,这里边大家看这个s map,这都是算子对不对,他们他们合并起来之后,我们可以也认为它是一个大的一个算子任务吧,不同的算子任务,他们的这个操作的复杂度是不是应该是不一样的呀?
10:12
就比方说我如果就是来了一个数,直接把它map成另外的一个,比方说我就直接加一,那这个操作是不是非常简单,但是如果说大家想到后边这个什么叫key by,还有window开窗,然后在apply apply里面有可能很很复杂的逻辑,对吧,这个过程是不是就会非常非常复杂啊?那所以大家想,你如果把这所有的这些任务都把它当成同样的这个,呃,优先级啊,都是平等的,把它分配到一个lo里边去的话,这种方式好吗。这种方式是不是它会有一些问题啊,什么问题呢?那就是有可能本身操作简单的任务,尽管都是一个任务,有的操作简单,有的操作复杂,操作简单的任务,诶,那真的就是很快就做完,是不是就在那等着了呀,对吧,因为后面都都没操作完的,如果前面的数据来的没那么快的话,它其实大部分时间是空闲的,就在那歇闲着了,歇着了。
11:15
而如果是后边我们这个操作非常复杂的任务,那是不是它就得满负荷运作啊,对吧?它的利用率占用率就要就要非常非常高,所以最后出现的状况就是大家就会想到,这就相当于是闲的闲死,忙的忙死啊,这种情况我们对累的累死对吧?这种情况我们其实是要尽量去避免的,我们其实是想要让这个资源能够更平均的分配到我们的任务上,对不对,然后能够让我们的资源利用率能够更高,这才是我们的目的,所以大家看啊,在这个,呃,就是我们知道在这个所有的算子操作里边,有一些这个source map这样的任务呢,呃,它相对比较简单,我们把它叫做非资源密集型的,它对资源的占用率就不是那么高,对不对,另外一些任务呢,比方说像这个呃,K外之后做window做派这样的一个一些聚合的操作,那这个相对来讲,它就对资源的占用率比较高,对吧?啊,它是。
12:15
一个资源密集型的任务。所以,为了防止出现这种。非资源密集型的任务,他一直处在空闲的状态在做等待,为了让这个slot资源也利用起来。那大家想,我们可以做一个什么事情呢?我是不是可以直接就把所有的任务都分配到每一个slot上面去啊?就是也有闲的任务在你这儿,也有忙的任务在你这儿,然后你自己去看着办吧,对吧,你自己去调配吧,你你该怎么放怎么放,如果这么一放的话,大家看所有的slot之间,他们的任务还是那么的不平等吗?哎,现在就不是了,对不对,只是这里面think它只有一个,那这个没办法,那我们就只能放在一个里边,对不对啊,总有一个是这样的,那至于其他的这些六个并型的子任务,大家就会发现,诶,我同样每一个slo里边都有非资源密集型的任务,也有资源密集型的任务,他们互相之间是不是可以错开去共享这个资源啊。
13:25
啊,所以在这个过程当中,大家看到默认情况下,Flink其实是允许子任务去共享这个slot的,即使他们是不同任务的子任啊,所以大家看到这就是我们前面的这个啊,或者说这里大家注意啊,是不是必须得是不同的这个,呃,算子任务的子任务啊。你假如说是这个,呃,同一个算子任务,我设了这个并行度是六的话,它是不是就不能把同样是s map这样一个任务,呃,这个s map的这个二直接放在这里啊,这是不是就不对了,因为他要他俩必须要并行处理对不对?但是大家想这里边我们的这个并行,我必须要求这里边的这个source map1和这个k by window,还有这个applied,这个一必须要并行执行吗?
14:14
诶好像没有说必须他俩要能同时执行对不对,因为他本来就是不同阶段的操作嘛,本来时间就有前有后,那所以说这个并不影响它的并并行度,所以从这个意义上来讲,大家看到啊,就是说呃,允许这个slot共享呢,它可以有两个好处,一个就是说可以获得更好的资源利用率,对吧?在这个过程当中,你不会出现闲的闲死这个,累的累死那种情况,我可以每一个slot都差不多忙,那具体你到底什么时候忙这个,什么时候忙那个,那那就是你自己去调度,看你的那个执行的顺序了,对吧?呃,到了哪个呃任务来,你去做这个时间片轮转的这样的一个分时调度,呃,那另外还有一个好处是什么呢?
15:04
还有一个好处就是说做manager不是得去拿到这个我们的这个这个执行计划之后,他得去确定我到底当前需要多少个slot吗?那大家看现在它是不是就相当于可以不需要,比方说我们当前这个,它不需要把这些东西全加起来,然后看一下,哎哟,总共是13个,所以我要13个slot,他只要看什么就可以了。他是不是只要看整个作业里边并行度最高的那个算子设置的并行度就可以了呀,我只要满足他的这个需求,是不是别的应该就都能满足啊,因为他们互相之间是不是可以并行啊,对吧,可以放在一个lo里边做slo的共享,哎,所以这个过程其实是就会简单很多,呃,这里边其实大家看到这样的一个结果是什么呢?这就相当于它还有一个好处是一个slot里边就可以保存整个作业的一个所谓的pipeline,一个处理的管道。
16:08
大家看什么叫一个pipeline呢?就是从头到尾的处理过程对不对?哎,我这一个W里边你就source map也有,然后什么key window这个也有,后面这个S也有,整个的就相当于是我们整个任务的一个切片一样,这种感觉对不对啊,所以它是这样的一个处理状态。那有些同学可能就会想了,那那假如说假如说我就不想要,就是说把这个全共享起来,就把它全放在一起去做这个,呃,资源的共享,我就想让一个任务全分开,每一个任务都放在一个lo里边,那可不可以做呢?大家想可不可以做到呢?也是可以做到的啊,这个在flink里边也是可以做到的,只不过这里边你要去做一些额外的设置,比方说我们可以设置一个叫做sharing group,就是相当于你共享资源的一个一个组,把它设置到哪个组里边去设置这个东西啊,那那就可以了,默认我们所有的这些不同的这个算子任务啊,他的这些任务都在同一个组里边。
17:17
啊,所以他就会想到他既然是在同一个sharing group里边,那是不是就是资源都可以共享啊,SW可以共享对不对,那你如果给他设置了不同的sharing group的话,那它就会分开了,就一定要分到不同的地方去,好呃,这个就给大家简单的提一句啊,然后这里大家需要掌握的一个概念,就是说slot指的是一个静态的概念,跟我们的那个并发度其实不一样,对吧?我们设置的那个并发度指的是运行时候真正拆成几个去并行执行,对吧?这是并发度的概念,而slot是一个静态的概念,它是可以执行的并发的能力,对不对啊,这是一个并发执行的能力,大家把这个要区分,好好给大家看一个具体的例子吧,稍微复杂一点的例子啊。
18:09
呃,那大家看这个例子里边,其实我们这大家先看见有几个task manager。一目了然,三个对吧,123每一个里边各有三个slot,对,所以总共我们的slo可用的slot数量是几个九个对啊,所以那大家看一下啊,这里边我们那个呃压文件配置文件里边设定了一个什么呢?啊,是不是设定了一个这个task manager number of task slots啊这个就是直接给了一个一个三对不对啊,这里设定好了,大家看他还写了一句,就是我们推荐的值是什么呢?推荐的值就是对CPU的那个核数对吧,这也跟我们之前给大家讲的是一样的,呃,那这里边我们已经有了这样的九个slot之后,现在我们如果要是有一个任务。
19:01
有一个任务来了之后,我们写了一个word count,对吧?大家看word count,假如我们直接设置了它的并行度是一。他会怎么样呢?哦,那那大家会想到这里边他用什么样的方式设置了这个并行度是一呢?它是不是直接把那个para Lim.default那个配置文件里面设成一了,所以最后它的执行效果就是什么样啊对,大家看九个slot只用了几个,对是不是就只用了一个呀,所以我们大部分的这个资源是不是就都浪费了呀,他把所有的这个任务全放在同一个slo里边去执行了。啊,所以这个过程其实我们其实是不想要这样的一个状态的,对吧?啊,那那接下来大家看,如果要是这个并行度我们设置成二的话,可以怎么样呢?啊,这里边大家先看一下这个设并行度是二有哪几种方法,我们讲过有三种方法对不对?首先默认配置文件,呃配置文件里边这个默认的并行度可以改成二,呃,这是这个优先级最低的那种配置方式,对吧,最底层的那个默认,然后我们还可以怎么样呢?对,是不是啊,这里少了一个那个wrong啊,就是flink run的时候,提交的时候是不是可以加一个杠P参数啊对吧?去可以给它指定默认的并行度,另外还可以指定什么,是不是在代码里边指定,呃,这里边没给大家讲的是,我们前面说到每一个算子后边是不是都可以跟那个set paraism那个并行度设置,这里边如果我们想要把所有的算子全局都给它设置成一个,就像外面做配置的那样,可不可以设呢?
20:42
哎,也可以,大家看怎么设,直接Env.set para啊,这就相当于执行环境设了一个全局的并行度,那就是所有算子都是这个对吧?啊,当然我们全局设了之后,下边的每一个算子还可以设吗?啊,还可以再设对那谁的优先级更高?哎,那就是对单独算子的那个设置优先度,优先级更高,如果后边没设,是不是用全局的这个设置,如果全局再没设,是不是用提交时候的这个配置啊参数对吧?如果这个参数又没有,是不是用这个配置文件里面的默认值对吧?啊,这就是整个的这个流程,那大家看,如果要是全局设成定行度是二的话,那又会怎么样呢?
21:24
啊,其实也很简单,是不是就是所有的任务都分到了两个slot里啊,啊大家看这个效果也不是很好,对吧?那什么样的效果大家觉得会更好一点呢?让我们觉得更舒服一点呢?对,大家可能想到我要充分利用它的这一个并行的能力,它的这个资源的话,是不是我应该把这个并行度设成九啊,设成九的话,就真正的是所有的这个任务都并行在九个lo里面去执行了。啊,他的这个并发能力就得到了最大的利用啊,那这样是一定就是最好的吗?那其实有时候不一定对吧,因为有时候是什么,这里面的think大家知道,这就相当于是我们要做这个要做输出,对吧,最后的这个数据要去放入到写入到外部的一些系统里面去,那大家会想到,假如说我们写到一个。
22:20
文件里面去的话,写到文件里边可能会有一个什么问题呢?假如我们把这个sink并行到九个slot里边,那大家会想到它并行的去写这个文件,有可能会出现什么问题啊对这个这个首先就是说这个相当于我们要写这个数据首先就乱了,对吧?啊这这一片那有好几个文件里边都有,另外你每一个task manager里边本身我们配置的那个文件路径是不是应该是固定的啊。那你如果固定的文件路径下面,它三个并行的线程同时在写这个文件,那他想这个是会不会有问题啊,对吧?啊,这个过程其实是会有问题的,那我们这个过程有可能就会做一个什么操作呢?哎,我们就要把这个think的并行度是不是要调低啊,啊,有可能我们要求你就干脆最后给我输出到一个文件里边就好了,按照这个顺序对吧,挨个给我写进来就好了,所以我们可以把这个。
23:22
并think的并行度,最后是不是给它单独算子上设一个,设一个它是一,那如果设了这个之后,他的状态是什么呢?啊,大家看这个就是它是write csv对吧,写到这个文件里边,然后set的并行度是一,最后状态就是。前面的两个任务所有的并行度都是九对,而只有最后一个S任务并行度是一,所以他就找一个slot放进去,只在那个slot上执行了,对吧?啊,所以大家看其实就是这个并行度,其实就是这样一件事情。
24:00
呃,这是我们这个slot和并行度的一个概念。
我来说两句