00:00
我们前面是提出了这样的三个思考的问题,那我们现在再来回答一下啊,那首先就是怎样实现并行计算,其实我们已经有有思路了,那就是多线程嘛,不同的任务分配到不同的线程上就可以并行执行了,那么不同的线程,那大家想执行的时候是不是需要有不同的能够运行它的这个计算机资源啊,啊,那所以这个资源怎么样去分配呢?这就是我们之前说的。Slot啊,就是不同的slot,它其实就是执行不同的任务啊,跑一个不同的线程,主要就是起到这样一个作用,然后第二个问题就是,那到底我们执行的时候,并行的任务需要多少个slot呢?这个我们当前还没有具体的一个解释,但是至少我们已经知道他的行为了,大家发现在我们提交一个job之后,他其实占用的lo是这个个数,跟什么有关?对,跟当前我设置所有每一步操作啊,每一个任务的最大并行度有关,对不对?诶当时如果我们最大并行度设三,最后其实你即使是一共有七个任务,大家发现那个数出来一共有七个任务,最后是不是只要三个lo就跑起来了啊,那这个到底为什么呢?哎,后面我们要给大家解释这个问题啊,然后最后还有一个问题,就是一个流处理程序到底包含多少个任务呢?那就是我们说的代码里边写出来的每一步操作,每一个那个计算的那个算子调用啊,到底对应着是几个任务呢?到底有些我们看到有一些对应的操作,它还合并在一起了,什么时候可以合并,什么时候不能合并呢?哎,这是后边我们要给大家重点解答的两个问题啊,就是第二个问题和第三个问题。
01:43
那首先接下来接下来我们首先看这个slot相关的这个内容啊呃,那我们首先给大家明确的提出并行度的概念,之前其实我们也已经看到了,在代码里边可以设置并行度,在提交job作业的时候可以去设置一个杠P参数,指定当前作业的执行的并行度,对吧?另外还可以在集群默认集群的那个配置文件里边给一个默认的并行度,那并行度到底是什么呢?
02:14
所谓的并行度,其实就是。并行执行任务的程度对吧?诶,所以大家会发现在flink代码里边,其实是每一步任务,每一个任务啊,都可以在后边来一个点set parallelism,设定它当前的并行度,那所以这个其实指的就是什么呢?当前特定算子的这个任务,它的并行的个数。就是当前任务的并行度对吧?啊,所以这里边就提了一个概念,叫一个特定算子,子任务的个数叫做它的并行度,什么叫子任务呢?就是说大家看这个S啊,我这里边假如说我给他并行度设了二,那大家想我们最终提交啊,然后去运行到那个slot上,执行的应该有几个SS任务呢?
03:08
哎,对,大家想是不是我应该拆成并行的两个S任务啊,对吧,SS1S2,所以这个在有些时候我们就直接把这个叫做它的就是都是同一步操作啊,这个SS同一步操作的并两个并行子任务对吧?啊,所以它的并行任务的个数就叫做它的并行度。那或者这个map map并行度也是二,那家看是不是就要最后我要执行的时候,是不是拆成两个map呀,对吧,这两个map可能要分配到不同的lo上去同时执行,这就是这个并行度的概念,那有时候呢,我们还会提到另外一个概念,就是整个流处理程序,或者说我们整个这条流里边,我们都说它有一个并行度。哎,大家想这个这个又叫做什么呢。整整条流的这个并行度又又指的是什么呢?这说的就是当前所有算子所有任务里边设置对并行度最大的那个就作为当前整条流的并行度啊,这是这个并行度的基本概念啊。呃,然后并行度大家就会发现了,它主要设置的是我们当前呃,并行执行的这个子任务的个数,对吧?而并行执行的子任务呢,又必须分配到。
04:26
不同的slot上去执行对吧,所以接下来它其实跟这个slot概念是密不可分的啊,那slot大家看它到底是一个什么样的概念呢。对于flink而言,Slot其实就是执行一个独立任务,或者说执行一个独立线程所需要的计算资源的一个最小单元,对吧?它就是一组这个计算资源,所以你要具体到我们这个计算机里边的这个硬件设施,硬件概念的话,它主要就是什么呢?就是我给你划分对应的这个CPU和内存资源,对不对?在当前flink的架构里边。
05:06
Slot每每一个slot啊,它的内存是完全隔离开的,也就是说每一个slot相当于这里边主要核心的资源,就是它有一块自己独享的内存,那个自然我们就想到了不同的料之间啊,既然这个内存它是隔离开的,那是不是就相当于彼此之间状态不会影响啊,啊,对吧,也就是彼此之间互不干扰啊,并行不悖,那当前呢,CPU资源是不隔离的。啊,因为大家知道这个CPU是不是有可能会出现什么情况,就是假如说我现在有一个核心啊,一个CPU,然后我要求有两个slot,大家想这就相当于是什么状态。这就相当于我这个一个CPU是不是轮流要啊,就是假如说我两个slo上,这不是相当于可以去去并行执行两个呃线程吗?那是不是相当于我这个CPU就应该做一个时间片的轮转分,呃被被这两个线程分别去调用啊,对吧?啊,供这两个线程同时使用啊,啊所以是这样的一个状态,所以这里边尽管slot并不单独分配CPU资源,那我们这里边推荐大家用什么方式来设置这个slot呢?是不是就是按照当前task manager的CPU核心数量来设置slots啊,哎,当时是不是给大家推荐过这样一种做法,那为什么这这样去推荐呢?
06:32
这样的话,那大家想是不是假如说我是四核的机器,我就直接给设置四个slot,那大家想这个直接他去分配这个资源,那是不是相当于就一个slot上占有一个CPU核心啊,诶对,那这样的话是不是就不用等了啊,对吧,相当于这个就是一个独享的状态了嘛,啊所以这是一个啊,就包括我们IDE啊,Idea里边去开发环境直接执行的时候,默认的这个并行度,你如果不设置的话,默认也是按照CPU核心数量来来定的,对吧?啊,那所以这里边大家看到就是slot,其实就是这样一个资源的概念啊。
07:09
那么在我们具体运行的过程当中,Slot和task manager又相当于是什么样的一个概念呢?啊,那大家就会发现,其实这个task manager其实就相当于是一个GVM进程对吧?就是我们说的啊,一个task manager就是我们在机器上啊,或者说在容器里边启动起起来的一个GVM进程啊,对应的我们不是有给他要分配对应的那个GVM对内存和对外内存嘛,对吧?啊这些都是分配好的啊,它是一个GVM进程,那所以它就可以在独立的线程上执行不同的子任务,对吧?哎,那所以就是当前我一个task manager是不是可以多线程执行啊,那所以那多线程执行,我当前到底能执行几个线程,这靠什么来控制,或者说我当前的这个线程到底执行在什么地方呢?啊,怎么样去分配资源呢?这就涉及到了stop概念了,这里边是不是每一个。
08:10
干成都要给它分配一个slot,分配独立的资源,然后去单独把它执行起来啊,所以这里边每一个task manager,它有几个slot是不是就。限制了当前它能对最大并发的线程数就限制限制死了呀,啊,所以大家看这里边,我们这个并行能力就是靠这个slot的数量来限定的。现在大家再回忆一下我们在flink集群的那个配置文件里边那两个参数,大家再回忆一下,能理解透彻了吗?一个是task,呃,Manager的那个number of task slots,就是每一个task manager里面的那个slot数量,对吧?它指代的是。就是每一个task manager最大的并行能力对不对,那就是我最多能够有多少个多多少个线程啊并行执行,而后面还有一个参数叫做parallelism.default默认并行度,这个指的是,哎,对,就是我job真正提交我的程序要运行的时候,到底我默认的那个并行度是几对吧?哎,所以大家看一个是静态的能力,另外一个是动态执行的时候,我要用多少这个能力,对吧?啊,我有这个能力我可以不用啊,就大家看这个,我有这个task slot我可以空着对不对,哎,我可以放在这儿嘛,所以大家看,这就是这个概念的不同。
09:35
然后接下来要给大家再说一下,就是诶,比方说我们举这个具体的例子啊,大家看这就是前面我们讲到的这个这个程序对吧?大家看这个程序是前边source map,然后后边这个,这是稍微复杂一点啊,KY做分组,后面还有window开窗了,然后呢,有一个apply,就相当于窗口要应用一个对应的计算,对吧?呃,不管怎么样,大家都认为这就是一个窗口计算吧,然后后边呢,又做了一个think think,这就相当于是最后要输出到外部系统,对吧?啊,大家看前面的并行度都是二,最后一步think并行度是一,然后大家发现现在按照我们的观点来看的话,应该有几个任务啊。
10:14
总共的任务有几个?应该有七个对不对?哎,你单独这么数的话,应该有七个,那当然了,后边我们要真正执行的时候,大家会发现最后的任务其实不是七个,是几个呢?有些要合并啊,所以大家看最后是五个任务哪些合并了呢?大家看这个前面的S和map合并在一起了是吧?啊,那这个到底为什么后面我们再解释啊,现在我们看到的就是五个任务,那这五个任务到底我放在几个slot上呢?哎,这里边大家想到的是最简单的方式,是不是就像这里列出来的这个呀,一字排开,每一个任务用一个线程来执行,然后是不是就占用一个slot呀?哎,所以这就是最简单的这种方式,但是大家想我们如果直接这么提交的话,这个任务真的是要占据五个slot才能跑起来吗?
11:09
这个任务,呃,就是当前我们这个drop啊,真的要是五个slot才能运行吗?其实不需要。啊,所以这个大家要注意啊,其实实际执行的时候,它只需要几个就可以跑起来啊,对,两个slo就可以跑起来啊,所以接下来我们再看另外的一种场景,大家看现在我是在之前的基础上把这个并行度是不是调大了,我现在并中度调成几了,大家看一下,对,大家看source map,还有后面这个开窗操作,并行度直接调成六了,所以大家看是不是一字排开,每一个这个任务都有六个并行子任务啊,对吧?啊,这个并行度是六,然后这个think呢,还是一对吧,所以现在它一共有13个。任务就是按照并行度拆开对吧,把并行任务拆分开,一共有13个任务,那他一共要13个lo才能执行吗?
12:03
不需要直接是不是只要六个slot就可以了,大家看就是只要排在这里就可以了,那最终我们执行的时候,那到底一共13个任务,你只有六个slot,那这个任务怎么分配呢。哦,大家看到啊,就是按照这张图的分配方式。这里面就涉及到一个默认情况下,Flink是允许子任务进行slot共享的。所谓的slog共享,就是大家看到不同的任务是不是放在了同一个slot里啊啊,所以接下来就相当于我在这个slot这个资源上,是不是相当于是要运行三个线程啊,这三个线程是不是抢占的资源都是这个slot上的资源,对吧?好,那那另外就是大家看到假如说这里边我这个thinkk只有一个并行度的话,那是不是其他的slot上就相当于没有这个SK任务了,那就相当于是两个任务抢占一个slot资源对吧?放在一个slo资源资源上去执行,然后呢,这里大家要注意一下它,呃,就是我们可以这么做的这个。
13:09
前提是。必须是前后发生的不同的任务才可以共享一个slot。诶,所以大家就想到,假如说我这里啊,这个source mapp这个一,这是这是它的并行一个子任务,对吧,那跟这个source mapb2或者source map3,它可以直接放在同一个slolo里吗?他想这肯定不行,对吧?因为我当时并行度把它拆开,是不是就是为了让它要并行执行啊,你现在又把它直接放到同一个slot里边,那大家想这个同一个slot我同一时间去执行,是不是占占据这个资源,应该是只能有一个线程啊,哎,那所以你如果还要再放在里边的话,那这不相当于又变成同时执行的只能有一个线程,这不相当于没有并行了吗?哎,那所以只要是并行的子任务啊,就是同一个任务,它并行的这些任务必须分开,必须执行在不同的lo上,而先后发生的这个不同的任务,它的这个子任务可以共享,大家想想这是为什么前后发生的这个子任务,如果要是可以共享的话,它有什么好处呢?
14:19
哎,其实这里面大家会发现一个,这里边有一个好处就是什么呢?我们首先第一个好处就是我在一个slot里边就可以保存整个作业的运行的整个管道啊,这是啥意思呢?什么叫整个这个运行的管道呢?这管道是翻译过来的那个概念啊,就是那个pipeline对吧?什么叫pipeline,就是大家看我这里边整个处理流程是不是就是数据来了之后,然后做source,然后做这个对应的这个window开窗计算对吧?然后做SK,然后就输出,那大家想我现在是不是直接在一个source上就有对应的,这这三步任务就全做完了呀。
15:01
那大家都会想,如果说我当前的这个数据中间不去做重分区对应的这样一个调整的话,是不是我直接在exw里边直接就就直接把这个任务就就搞定了呀,是就接下来是不是不涉及到去做这个不同slot甚至跨task manager之间的那个数据传输啊。就不涉及到这个工作了,对吧,那这个效率是不是特别高,诶这是一方面对吧,然后另外还有一方面就是假如说我别的那个task manager挂了,或者说其他的这个slot挂了,那家想对我现在是不是就相当于我并行路直接降低不就完事了吗?并行路降低是不是最终我还是可以靠一个slot就直接可以把它全执行起来啊,哎,所以大家会发现这个就是它有这样的好处,对吧,就可以我们这个整个运行过程当中的这个健壮性啊,以及就是运行的效率都会得到提升。然后另外还有一个非常重要的要素,就是大家想一下前面我们这种一字排开的做法啊。
16:03
一字排开的做法,这是每一个任务都给它分配一个资源,分配一个slot啊,啊就单独的一个线程,就就分配对应的这个slot,这个slot上就执行这一个线程,那会带来一个问题,大家想我们不同的前后发生的不同的任务,它的这个对于计算资源的占用是不是有可能不太一样啊,有可能有些操作是CPU密集型的,有些可能操作就还好,对吧。比方说你像前面这个source map,大家其实知道它合并,即使是合并了两个任务,这个操作是不是也很简单啊,我把数据读取进来,然后map map大家知道一般就是一个简单转换嘛,对吧?哎,我map,比方说这个数乘二对吧,或者说把它这个数据啊打散提炼,那打散那个我们还是flat map,那一般那个简单的map可能连那个操作都没有对吧,可能我们就是非常简单的啊,把这个数据比方说做一个啊,比方说一个string转换成一个int,哎,转换完了,这是不是相当于就根本不耗费CPU资源,一瞬间就搞定了呀,而后边这个开窗计算,大家想它是不是可能对CPU的占用就会非常非常高,那大家想,如果说我们把它一字排开,每一个都单独一个线程占据一个slot的话,就会导致什么情况啊对大家想那是不是我那个数据量很大的时候,就会出现忙的忙死闲的闲死啊对吧,数据量很大的时候,很忙的那个操作啊,那一步计算是不是就卡在这儿。
17:29
这个数据就全做不完啊,然后前面我的这个任务呢,又是快速的读数据,是不是就都堆在这儿,根本就执行不完啊。诶,所以大家就会发现这就很容易出现这种数据堆积的现象,那我可以怎么样解决这个问题呢?大家看我现在做了这个slo共享的话,那是不是就是假如我这里边的那个source mapp很快做完的话,是不是接下来我还可以去执行另外的一个线程啊,那你前面那个source map做完了,那我只要有对应的那个数据来,对吧?我我接下来这个去处理下一步的那个开窗计算也是没问题的啊,所以大家看,这就使得我们每一个lo的资源都可以得到充分的利用,对吧?在执行的过程当中不要出现那种就是等待或者说数据堆积,没有办法执行下一步操作,不要出现这样的现象啊,这就是这个flink底层的一个原理啊,它默认是可以做这个slock共享的,那这里给大家提一句,就是说,呃,那那是不是说前面我们这个图就画错了呢?因为默认lo共享吧,那是不是它不应该一字排开画成这样对吧,那是不是应该。
18:37
大家想这个默认是不是它只占两个slot呀,第一个slot里边应应该是source map1,然后这个window开窗对吧一,然后THINK1都放在这儿,然后第二个slot是这个,呃,Source map2,然后WINDOW2对吧,就只占两个slo就完事了,那那有没有可能我们代码里面出现这种情况呢?是有可能的啊,呃,这个如果要想出现这种情况,那得怎么做呢?我在代码里边给大家写一下啊,这个要要做另外的一个设置,就是需要给配置一个所谓的。
19:13
Slot共享组,这个slot共享组跟我们前面的这个设置并行度的这个操作有点类似啊,都是针对每一步操作,每一个算子都可以在后边直接去追加的,这个追加的方法就叫做点slot sharing group,也就是就是我们分配的这个槽位,对吧,插槽的共享组。那大家看这里边传一个参数,这个参数就是一个string类型的一个参数对吧,它指明的就是当前slot的共享组的名称啊,那所以这个这个共享组名称其实就是随便随便给对吧,比方说我这个叫一组可以对吧,或者说我这个叫红组也是可以的,对吧?啊,这都没问题啊,你随便给一个名就完事了。
20:00
啊,那所以这里面有一个概念,就是说设这个W共享组,这这有什么,就是接下来会有什么变化呢。就是对于不同的slot共享组啊,比方说你看这个,呃,这个sum和这个flat map啊,我设两个不同的这个slot共享组,那个是red,这个是green,对吧,那大家看红组里边的所有的任务和绿组里边所有的任务。大家注意啊,它是共slot共享组对吧,所以是组内的任务都可以共享一个slot。而如果涉及到不同的组之间,那怎么办?红组和绿组之间怎么办?对他们就一定要占用不同的slot,所以意思就是说,假如说这个时候你划分这个,我们当前这个啊,就是source map和这个后面的window操作,如果设了不同的共享组的话,那大家想是不是它就一定要占用不同的lo呀,那是不是就一字排开了啊,所以这个其实是可以做到的啊,你在代码里边做一个这样的设置就可以做到了。
21:08
呃,那那所以这里面的话,大家可能看的还不是很明显啊,就是你像这个lo共享组,这里还有一个还有一个问题就是说,假如我没配的那个默认是啥,对吧?啊,这个大家注意就是默认是跟前边。他的前一步操作那个任务,他的那个共享组是一样的。诶,那所以这里边又又涉及到,那我前面这个直接读取这个当前的数据源,它后面就没有设这个共享组呀,那他的这个共享组又是什么呢。它的这个共享组,如果我们之前没有设置的话,共享组就叫做default名字那个名称就叫做default对吧?啊,就是字符串default啊,那所以你后面都不设,那是不是都跟之前一样,都是default呀,所以大家看默认是不是就是所有的都在同一个共享组里边都可以共享啊,哎,那所以现在大家看我的这个代码啊,这代表什么含义?
22:01
这是不是就是source任务,这是表示一个共享组,对吧?哎,这是一个一个这个default共享组,然后flat map呢,这是一个green,又是一个绿组,又是一个共享组,然后sum任务呢,又是一个red这个共享组,最后这个呃,就是我们这里边这个print啊,打印输出它是哪个共享组呢?大家注意这不是default,它是跟之前的这个任务一样,所以它也是宏组,对不对,对吧,它也是red啊,所以大家要注意啊,这就是如果说你后边不设不设置的话,就跟前边我们这这个对应的这个组是一样的,所以你接下来如果要把它们共享的话,那那其实是什么?是不是这个red和后边的这个print可以放在同一个source里面去共享,然后前边的这个socket啊,这里边S得放在一个里边,对吧,那flat map是不是也得放在一个里边啊。
23:01
那所以大家能想到当前如果我要提交一下这个任务的话,它需要多少个slot吗?大家能想到吗?这个任务需要多少个slot,但是这个还涉及到就是并行度的问题了,对吧,假如说我们在这里,这个就是在这里啊,提交的时候我还是什么都不给,就直接默认。哎,那那大家知道,首先我们看代码里面并行度对吧,然后如果默认没有的话,那就看那个配置文件,那是不是就是一啊,那大家想一下,现在我们需要多少个这个。才能把它执行起来哦,大家其其实就想到了啊,我这里边可以直接给大家把这个再compile再再运行一下,再打包一下,对吧,大家可以看一下这个结果啊。大家想到不同的slot共享组是不是一定要占用不同的slot呀,对吧?呃,所以接下来其实就是说我的这个socket这一步是不是要占用一个,然后面Fla map是不是也要占用一个啊呃,然后后面的这个some和这个print它俩是不是可以共享啊,所以它俩是不是可以占用两个啊啊对,所以接下来我们在代码里边。
24:16
再去ADD new啊,我们把这个做一个提交。好,这是我们刚刚,诶不是啊。好,这个是我们刚刚上传的这个抓包,然后我们还是同样先把这个写好,Post local host port 7777,然后接下来这里面不写,然后收大家看一下,诶大家看这里边出现了一些奇怪的现象,就是前面是不是我们把这个就拆开了呀,大家能想到为什么拆开吗?现在是不是他都必须在不同的lo上执行了,那你说他能是一个任务吗?啊,那肯定不能是一个任务了,对吧?哎,之前我们是一个任务的话,那肯定是不是在一个slo上执行啊,你现在都强行指定它必须在不同的slo上执行了,那是不是必须拆开啊,诶,那所以现在的话它就是必须拆开,那大家会想到现在直接我做一个这个提交的话,当前我应该有几个任务。
25:17
是不是一共有五个任务啊,五个任务要占用几个slo呢?诶大家看是不是执行起来了,已经running执行起来了,对吧?当然这里边大家看到这个直接fail了啊,大家看啊,当然现在已经看不到,大家知道这个执行起来的话,是不是应该本身这里边是。本身应该是只要有四个slot就能运行起来啊,那这里为什么feel大家知道吗?对,大家大家能想到啊,我们看一下这里边的这个log这个日志。大家呃,哎,这个大家看在这儿对吧,大家看是不是connection refused啊对吧,因为我们那个NC是不是没起,因为里边有一个那个so文本流的这个连接啊,所以我这里边把这个NC要起一下。
26:06
然后接下来我还是重新提交一下这个啊,Host local host7777。好,Submit一下。大家看一共五个任务,现在大家注意占用的slot不是两个了,而是而是四个对不对?哎,你要按照不同的那个,大家想现在我总共占用的这个数量按照什么来算呢?大家看都都占用了对吧,而且正常执行起来了,诶那那这个到底按照什么来算呢。之前我们的想法是按照最大的那个并行度来算嘛,那现在是按照什么来算呢?是不是先得先分组对吧?每个共享组里边按它的最大并行度,然后再再把各组的那个最大的并行度是不是叠加起来啊,所以大家看我现在的这个状态是不是是一组,它是一组,下边它是一组啊,所以大家看是不是一加一加二,总共是不是四个slo就执行起来了,哎,这就是我们这个关于slot啊,共享的这样的一个呃,实现和测试,大家可以看看这个任务调度怎么去做的。
我来说两句