00:00
我们接下来再来给大家介绍一下flink里边任务调度的一个原理啊,那前面其实我们都已经知道了,在处理这个flink程序的时候,提交的时候呢,我们主要是先由这个dispat啊,或者说有这个资源管理平台啊,把这个集成起来,呃,那提交的时候主要是要把它给到job manager,那在这个过程当中,Job manager拿到的东西到底是什么,然后他又要做什么样的调整转换,然后把它,呃,就是申请到资源之后,把它交给他manager去执行呢?呃,这里边就涉及到了一个任务调度的过程。大家看一下这幅图,这幅图呢就详细的给我们说明了,从一开始我们写出来的这个源码,Flink的这个程序的code,到最后我们真正转换的可可执行的每一个task这样的一个过程。简单来讲的话,那就是一开始我们先写出这个code的啊,大家知道,呃,首先你是要做这个编译打包对吧?啊,那那其实这个编译打包的时候会生成一个什么东西呢?它会按照我们它代码里边定义的那个处理流程,大家发现我们在代码里边其实就是一步一步,呃,那那其实就是每一步操作都跟在后边对吧?啊,就是SC里边的这种,呃,链式调用的这种方式啊,一步一步往后面做转换,那其实我们已经定义好了一个数据处理的流程啊,那么根据这个原始的代码,直接就可以生成一个叫做逻辑上的数据流图,就是一个data gra。
01:40
啊,这这就这就跟我们整个数据处理的那个流程是一模一样的啊,就是大家能想到啊,我们处理的那个DG嘛,然后接下来呢,呃,就是我们首先是要有一个客户端把这个东西发送出来,这个客户端当然有可能是我们的命令行对吧?呃,也有可能是这个web UI,反正是有这样一个接口,我们要把它做一个发送,那在这个发送的过程当中呢,本身客户端这里就首先会对当前的这个流流程啊,就是我们的这个数据流图data flow graph做一个简单的调整,把有一些可以合并的操作就合在一起。
02:20
所以大家后来发现我们提交那个job之后,你运行的时候呢,会发现它有一些任务是合在一起的,对吧?这步操作在一开始就做完了,然后接下来呢,就把当前已经合并好的啊,大家注意啊,当前这个呃数据流图做合并之后,就得到了一个新的数据流图,对吧?这个数据流图一般情况我们把它叫做一个job graph,然后这个图以及我们这里边要执行的那个抓包啊,所有的这些文件就提交给了job manager。然后job manager这里呢,接下来就会分析我们当前的这个数据流图啊,他就要判断你当前并行度到底是几呢,对吧?然后每一个任务它有几个并行的子任务呢?我要把它拆开,拆开之后大家就想到是不是就可以知道我当前到底有多少个任务,然后它这里面有调度器对吧?然后就可以,呃,可以去判断我到底需要多少个slot,然后怎么样去分配啊,到底怎么样去执行这样的一个任务了啊,那所以接下来我们知道它的流程就是我去申请资源对吧?Resource manager那边去申请资源,申请到了之后,大家看这边我们有两个task manager给它提供资源,对吧?然后这里边的每个task manager里边,注意本身是有三个slot,这是它的一个静态的能力,也就是说我最多可以提供三个slot来执行并行的程序,执行并行的任务,但是我现在这个当前的这个任务啊。
03:52
那这个job是不是要把他的这个静态的能力全用起来呢?诶,可以不用,这就是我们说的,你有多大本事,你可以不用出来呀,对吧,你可以收着嘛,可以这个收,收着力去去做这个事儿,对吧,不不尽全力,所以这里边大家看我们本来有三个slot,但是这里边呢,利用起来的只有两个,只需要占用两个,因为这跟我们当时的那个并行度有关,对吧,你到底需要怎么样去并行,你把那个拆分出来之后,发现我当前需要四个lo去执行,那大家看到job manager呢,就做了一个调度,把它分配给了两个worker上的,每个worker上的两个slot分别分配了一个任务。
04:37
一个task,然后接下来大家看到这个job manager和task manager之间,他们做的这个数据传输呢,哎,那那主要就是job manager这边给task manager,它主要做调度,做分配这个,呃,做做这个任务的调度和管理嘛,所以主要给这边传的,就是比方说。这个deploy一个部署一个任务,对吧,或者说我要去停一个任务,或者说cancel取消一个任务,这些信息是由job manager发出指令发给他manager,另外还有就是我们说他会去管理我们的那个checkpoint的机制,对吧,做存盘的那个机制,那个信息也是照manager这边发出的。
05:15
而task manager这边给他传什么呢?那就是我们当前,哎,就是有一些比方说统计信息,心跳信息,对吧,所有的这些信息要实时的传,传递给这个job manager,他们之间是保持着这个连接的状态,这就是整个。在任务运行执行的过程当中,所谓的这个调度的一个原理啊,整体上来讲就是这样的一个过程。那大家可能就会有有这个具体的问题了,对吧,你这个其实讲解的还是比较泛泛而谈的一个概念吧,那具体来说的话,诶,我们就会想到,到底这里边的这个task跟slot的关系是什么样的呢?哎,我们之前又说过这个并行度,那并行度在这里边的体现又是什么样的呢?
06:04
啊,所以大家如果要是有这样的疑问的话,大家可以先看一下啊,我们这里总结出来这样的三个问题,大家可以先思考一下,就是首先我们要思考一下,在flink里边到底怎么样去实现并行计算呢?这个问题比较简单,我们在代码里面体现的非常明显,就是每一个每一步计算操作啊,就相当于我们在代码里边实现的每一步,每一个那个算子调用啊,每一个运算的那个算子,它都可以在后边去给它设置一个啊parallelism对吧,一个并行度,所以说只要设置了并行度之后,那是不是相当于当前的这个操作啊,这个算子就可以并行执行了,对吧?就是当前数据来了之后,我就可以分配到,呃,不同的啊,同时不同的数据都可以分配到这个,呃,同时比方说我们这个map任务有三个并行度,那那我们来了数据之后呢,你就可以同时分配到这并行的三个map任务里边,同时做操作,这不就并行呢。
07:04
计算了吗?啊,所以这个其实是非常简单的一个实现啊,这个问题我们就解决了,那接下来还会有问题,就是说我们当前并行的这这个任务到底需要占用多少slot呢?这个大家其实也比较好理解,就是那你既然是并行的任务嘛,像我们前面给大家说的,假如说啊,现在这个map任务它并行的有三个,它的并行度设了三,那我们知道当前这个map不就有三个并行的子任务吗?对吧,我们把它叫做这个sub task啊,这个并行的子任务,那如果说我们想要让它并行起来的话,那是不是这三个并行的任务就必须以每一个任务占用一个单独的slot呀,大家想想是不是这样?呃,因为我们说这个slot是独享的一组这个资源单位嘛,所以你想要让这个当前的任务并行,那就必须给他分配单独的资源,你假如他们混起来的话,那那这个就没法没法去并行了,对吧,他们已经混为一谈了嘛,呃,特别是就是假如说我们还是呃,CPU资源你还要共享的话,那到最后你相当于还是做完一个做一个嘛,根本就没有并行。
08:16
所以我们现在想要实现并行,那肯定当前任务它的并行子任务都应该分配到不同的lo上啊,那有同学可能就想了,那这个就很简单了,对吧?啊,那我们结合最后一个问题来看一个流处理程序里面到底包含多少个任务呢?啊,那我们就想了,那这不就是叠加起来吗?比方说这个map,这里边有这个三个子任务对吧,并行的子任务,然后比方说前面我有一部这个filter。哎,那那假如说这个filter我的并行度设置的是二,那我就是三加二嘛,这不就是五个任务吗?对吧,最后执行那每一个任务都要占一个slot,那就占用五个slot嘛,不就这么简单吗。但是其实我们在做这个测试,在部署提交的时候,大家发现具体提交上去之后,你看到生成的那个执行计划图,它里边的任务数量不是这样的。
09:11
首先我们发现有一些任务可以合并对吧?然后另外你会发现他最后占占用的slot数量好像也不是按照这个来来定的,那为什么最后的行为这么奇怪呢?弗link到底底层对这个任务调度有什么优化,有什么默认的规则会导致出现这样的行为呢?啊,大家还记得我们当时如果说一开始并行度是二的话,大家还记得我们的那个word count程序是有六个任务对吧?分配出来相当于总共所有的任务算起来啊,算上这个并行的任务一共有六个啊,那那最后你跑不起来,这个我们认为是很正常的,对吧?资源不够嘛,一开始这个集群根本不够,呃,然后如果说你要是把并行都改成一的话,大家看到有两个任务,但是我们只有一个slot,其实也跑起来了。
10:01
那你说这到底为什么呢?为什么说两个任务可以在一个lo里边执行呢?啊,接下来我们就怀揣着这些问题给大家来讲一讲,啊,剩下的一些具体的概念给大家来再做一个扩展说明。首先这里边要给大家说的是这个关于并行度的概念,我们把这个概念再给大家详细的做一个讲解啊,那首先这里边就是我们说flink程序里边,它的执行是有这个并行的特点,那么这个并行怎么实现的呢?分布式嘛,啊对吧,这个分布式,而且我们每一个task manager上边还划分了不同的slot,诶这个就是涉及到这个并行跟slot的一个关系了,所以说在这种场景下,我们把不同的并行子任务分配到不同的slo上去执行,然后就实现了一个并行的处理,并行的计算,那我们在代码里边可以直接指定每一个算子,就是我们的那个操作,对吧,直接每一步操作,Map filter sum,每一步操作。
11:09
直直接可以指定它的并行子任务的个数,那指定的这个并行子任务的个数就叫做当前算子的并行度对吧?啊,这就是我们所谓的这个parallelism它的一个概念,那大家看到前面我们定义的这样的一个呃,几步操作啊,当然这几步操作里边涉及到一些比较复杂的,我们现在还没有接触,比方说像这个window开窗对吧,做这个窗口计算,呃,大家简单来看的话就是sources,先读数据源,然后呢,Map做一个转换,然后是做开窗处理,最后在think是做一个输出啊类似于这样的几步操作对吧?啊,那对应的这个并行度设设多少呢?比方说前面几步操作我都设了并行度是二,那大家就知道你拆开的话,每一个是不是都应该有两个并行的子任务啊啊,所以这就是当前它这个算子的并行度,然后think,假如并行度我们设了一啊,那大家看它的并行子任务就只有一个,这就是他的并行度。
12:08
那我们想有时候我们也会说整个这个,呃,流处理啊,就是当前整个流,我们说它有一个总的并行度,这个总的并行度又是什么呢。啊,有同学可能说,哎,那就是之前我们在代码里边那个env,直接在环境里边默认设的那个并行度嘛,全局设的那个并行度嘛,啊这里边我们说流处理程序的并行度跟那个概念还不一样,那个相当于是一个每个算子的默认并行度,对吧?当前环境里边的默认并行度跟我们在外部去提交任务的时候给的那个杠P参数是类似的,只不过是写在代码里边了,那那这里边如果说我们说当前的这个流处理程序它有一个总并行度的话,一般指的是。指的是当前所有算子里边最大的那个并行度,就指的是当前这个流的并行度。
13:02
啊,大家先先知道有这样的一个概念对吧?有这样的一个定义,那为什么有这样的一个定义呢?啊,我们马上就会知道,它其实就是跟我们所占据的slot数量有关啊,所以后边我们就要给大家说到,就是到底什么是slot对吧?Task manager跟slot的一个关系,其实前面我们也已经说了啊,所谓的这个slot其实是什么呢?它其实就是当前我们在做这个flink集群里边资源分配管理的一个就是固定大小的资源,资源的子集,对吧?这样的固定大小的一组资源啊,那在具体来讲呢,Slot它主要是按照什么划分,主要是按照独立的内存做划分的,因为我们想你在做这个执行任务的时候,主要需要的资源,那就是内存和CPU嘛,我们现在这个弗link集群架构里边,它对CPU的资源没有直接做隔离,没有做划分,因为大家知道CPU本身你是可以做。
14:03
即使是一个CPU也可以划分这个并行任务,对吧,你可以有多线程吧,那那就是做这个时间片轮转了啊啊,尽管看起来好像并不是真正意义上的并行,但其实可以实现这个效果的,所以在flink里边,它的slo的资源对CPU没有隔离,只是对内存有一块独享的内存资源啊,做了一个隔离,那所以说,呃,比方说现在我们这个task manager有三个slot的话,那就相当于是什么呢?就是把当前这个manager啊,我们自己管理的那个内存,内存资源直接就是manager的那个memory啊,直接把它划分成了三等份,然后分配到了每个独立的lo上面去。然后我们在执行的时候啊,在在执行这个任务的时候,这就有点相当于是什么呢?啊,这就是相当于每一个slot上就可以单独的执行一个独立的任务,然后在这个上面去去跑去运行就完事了,那对应我们的这个宏观的概念,大家可以认为每一个task manager呢,可以认为是一个GVM进程,然后里边执行的每一个任务可以认为是一个独立的线程,对吧?那这相当于是一个thad,那每一个线程大家想你在去执行的时候,是不是要有自己的独立的这个内存资源这一份,为了保证它这个呃,线程之间不互相影响对吧,不互相干扰,我们要给他一份独立的内存空间,那这个就叫做slot对吧,他所占据的占据的这份资源,线程所占据的这份资源就叫做slot。
15:36
啊,那所以这里面其实就是说,呃,大家就会看到啊,就是所谓的这个slot数量,它其实是task manager啊,就相当于内存划分的一个个数,那么划分成多少份,这就相当于决定了一个task manager同时可以并行的执行几个task对吧?啊,所以说这相当于我们是通过这个slot的数量可以控制当前task manager的一个并行处理的能力,所以我们说这是一个静态能力。
16:09
也就是说这个能力有是已经有了,但未必要用起来啊,就直接你可以把它放在这儿,我可以不用,这个是完全没问题的啊,这里边我们就先给大家把这个并行度和帕斯manager,还有这个lot的概念先给大家做一个梳理讲解。
我来说两句