00:00
有了这个概念之后呢,我们再来介绍另外一个很重要的概念,就是所谓的并行度啊,这个其实很好理解,在做作业提交的时候,我们还可以改并行度嘛,那什么叫并行度呢?所谓的并行度其实就是每一个算子,大家知道啊,这不是把这个程序里边的每一个转换操作,转换方法对应到了我们这个data flow里边的算子嘛,那既然是算子是一个operator,它是可以操作的嘛,那最后这个算子是不是就应该对应着一个任务呢?呃,确实大概的想法应该是这样的啊,一个算子就应该对应一个任务,但是最后我们是还要做大数据处理嘛,应该要并行计算,怎么并行计算呢。一个非常简单的想法就是我把这个算子在不同的机器不同的节点上。复制多份。然后那是不是在不同的节点上就都能去执行这个操作了,哎,所以这样的话,我就相当于把这个任务并行去处理了,这样的时候如果一同一个数据来了之后,是不是我我进入这个算子也可以,进入这个算子也可以,只要进入一个都是把它处理完了,哎,这就是所谓的并行计算对吧?这个大家需要区别一个的是在我们这个流处理架构里边啊。
01:15
呃,跟Spark不一样,Spark是分阶段的嘛,它有stage啊,然后杀Le什么的,我们这里边流处理的时候,它本身就是数据流,所以前后的任务它本身就可以并行,什么意思呢?就是假如说啊,我这边的数据。停的来,我这来了一个一,然后又来了一个二,又来了一个三,那么大家知道一通过这个S读进来之后,然后去做这里的这个map转换操作,如果一已经在这儿做map转换了,那大家说这个S会闲着吗?SS肯定不闲着,对不对,他只要把这个一这个数据处理完了,传输出去,传递出去了,那他就可以读下一个数了呀。所以就有可能会出现map正在做啊,就是比方说啊,后边的任务正在做,一这个数据正在处理,而map在处理,二这个数据SS已经在读三这个数据了。
02:07
所以完全有可能我们不同的任务都在执行,但是他们可能运运行的正在处理的是不同的数啊,所以这是一种并行,这种并行叫做任务并行。就是数没有并行开,同一个操作没有在同时处理,对吧,那就只是在一个这个节点上去处理,但是呢,不同的操作我可以把它排开同时处理,这是任务并行。那另外还有一种并行是什么呢?大家知道任务并行,你这个还是有瓶颈嘛,你数据如果特别多的时候,你还得一个一个嘛,只是前后发生的这个前后执行的任务可以同时执行,那你同样都是在SS这里的话,你还是得处理完一才能处理二,处理处理三,如果我一下子来了1亿个数,你你这个SS还是得一个一个等着吗?那怎么办呢?把它SS复制成多份一字排开,那这样的话接下来就变成什么了。
03:06
那就变成了我的数据来了,一可以分配到SS1这个并行任务里边,二来了之后可以分配到SS2这个并行任务里边,诶那家看现在是不是这个就数据都可以同时由同一步操作,我们说的同一个算子。直接同一时间执行了,哎,所以从这个意义上,我们可以认为并行有两种概念,一种叫任务并行。就是我们说的前后发生的不同任务,他们是同时处理数据的,另外呢,还有一个叫做数据并行。就是说同一个操作,同一个算子。可以把它拆成多份,同时处理多个数据。在大数据场景下,我们所说的并行更加关注的是数据并行,对吧?啊,当然任务并行也也很好,你像Spark那样的话就是,呃,前后不同阶段的任务也没法并行,你必须一个阶段处理完了之后才能处理下一个啊,啊,所以这个概念大家还是要区分一下,那总结起来给一个标准概念的话,就是每一个算子一个operator,它对应着任务是什么呢?它可以对应一个或多个任务,有时候我们把这个任务就叫做并行子任务。
04:20
就是sub task啊啊,这这个sub task和task概念我们不做呃强制的区分啊,一般情况下我们所所说的任务就是最后在task manager上单独执行的这一个东西,对吧?所以有时候我会把它叫做并行子任务,这个就听着非常的非常的明确了啊,一看就知道是单独的这一个啊,那对于这个统一的SS这步操作,我们就把它叫做SS算子啊,这样的话就是它包含可能包含多个并行子任务,那这些子任务可能会怎么样去执行呢?诶,那之前我们就说了,每一个task manager不是可以认为是一个GVM进程吗?那这个进程上是不是就可以多线程去处理啊,啊,所以这些并行子任务可以在不同的线程,当然也可以在不同的机器,不同的task manager,对吧?那不同的task manager可以是不同的物理机,也可以是不同的容器,它们都可以完全独立的运行。
05:15
这就是所谓并行计算的概念。那什么叫做并行度呢?那就是当前并行计算某一个特定算子,它做并行计算的子任务的个数就叫做它的并行度parallela。通过这个定义,大家可以明确的看出来,定形度针对的是什么。并行度针对的是算子对吧?啊,所以它其实针对的是我们这里边的某一步操作而言的,言下之意,那是不是一个程序里边一个作业里边的不同步骤不同算子。是不是可以设不同的并行度啊,哎,所以大家看前面几个并行度这是几啊,拆开之后都是并行度是二,而最后一个S有可能大家看我直接给它设置并行度是一,这样的话就相当于前面都是并行两个任务在做处理,同时处理的最后一步呢,我需要汇总在一起,在一个。
06:14
Think任务里边去输出。啊,这个其实有时候还是挺常见的,因为大家知道,呃,Think这步操作,比方说我要写入到文件,我不想把它分布式的写到不同的文件里,对吧,我就想汇总到一个大的文件里边,那是不是最后我就应该把它复制,就是只有一个这个任务啊,你如果并行执行的话,那不同的task manager在执行,那肯定是本地不同的文件嘛。所以这个就是我们可以看到这个它有一些实际应用的场景啊,它确实是这样去去做的。呃,那并行度在代码里边怎么样去设置呢?其实前面我们讲讲到并行度的时候说了啊,并行度是针对每一个特定算子而言的,所以在代码里边我们可以针对每一个真正意义上的算子对应的那个方法调用后边跟上一个并路的设置啊前面我们也说了Fla map本身是一个算子啊,那所以我们可以在它后边用that parallelism去做一个并行路的设置,就可以随便给一个数字就可以,好,那像这个K派,我们知道它本身就不是一个算子,所以这个就不能做设置,对吧?而其他的你像这个sum后边。
07:25
像这个print后边他们都是可以直接去做这个并行度设置,这是把里边设置的方法啊,那另外呢,我们会发现其实在整个这个应用程序里面啊。因为本身也有一个that这样一个方法,这样表示什么含义呢?就是全局的,在整个这个程序里边啊,整个这个应用里边,全局的把所有算子的并行度设置为一。啊,那有同学可能想了,你既然这里可以全局的设,那后边假如说我在这儿又设了一个这这怎么办呢。
08:02
啊,这就有一个设置并行度优先级的一个问题了,啊,设置并行度最优先的是什么呢?是代码里边每一个算子,它本身的设置的并行度。就是这里边如果单独你给萨姆设了一个二的话,那么它的并行度就是二,如果这里没有设置,那就要看。当前全局有没有设置,如果设了一哦,那当前所有的算子啊,没有设的并行度就都是一,其实啊,这个全局在这里边直接设一个并行度,这个并不提倡,为什么呢?因为大家知道这相当于hard code啊,直接把它定死了嘛,现在我们直接提交作业的时候,其实就受到了很大的限制,你即使是提交作业的时候,想要让他给他分配更多的资源,也分配不了,因为他并行度只能拆这么多的这个并行任务嘛,对于每一个算子而言,是有可能你需要去单独定义的啊,就当前这个这个算子,呃,可能我就只希望它在用一个任务来执行,就像最后我只希望写入到一个文件里边,那你就单独的把它设成并行度是一,这是可以的,但是全局的这个设置就没有必要。
09:07
因为它等价于什么呢?就等价于我们在这个作业提交时候给定的这个杠P参数,大家还记得这里边这个parallelism在命令行提交的时候,其实就是一句放屁对吧。P2,那么这就相当于什么?就是对当前提交的这个作业。全局指定了一个并行度是二,跟在代码里边直接写死这个en nv parallelism2是一样的,效果是一样,那如果说我在代码里边写了,也在外面这里边设置了,那以谁为准呢?以代码为准,因为大家知道你这里提交的时候只是外部的一个设置,而代码呢,是运行的时候的一个本身设置啊,所以代码的优先级是最高的啊啊,所以总结起来的话,我们就是先看每一个算子它的并行度的设置,然后如果没有的话,看代码里边env环境的全局设置,如果这里又没有的话,那么看作业提交时候给的杠P参数的设置。
10:11
如果这里也没有呢?哎,这就回到我们最前前面说的那个了啊,如果这里也没有的话,那是不是就需要看集群环境comungf flink压文件下边有一个配置的参数叫做parallelism.default。这里设置的这个不是整个集群里边所有提交的作业最终默认的那个并行度,当然了它的优先级就最低,因为它是对集整个集群都生效嘛,哎,所以我们就看到了啊,就是越精细,你如果控制的范围越小,它的优先级就越高,控制的范围越大越灵活,哎,这样的话它的优先级就越低啊就是大概就是这样的一个状态,那一般情况项目实践的时候推荐是什么呢?呃,其实集群里边这个配置,这只是一个默认配置而已,一般我们不会以这个一为准,对吧,而且也不会直接把这个集群里边直接就定死,所有的并行度都一样,肯定是针对每一个作业而言的啊,要单独指定的,那指定的时候呢,一般就是如果有特殊需求的算子,那我们单独的分配一下,代码里边分配一下。
11:21
那没有需求的呢,我们全局就不要去设置了,而是在提交作业的时候在这里加杠P参数,那就非常灵活嘛,你假如说当前这个资源不够用了,我还可以把它停掉之后再重启,重启的时候给一个小点的并行度,诶这样肯定占的资源就小了嘛,或者说如果现在我们这个数据量变大了,那怎么办呢?呃,之前资源不够用了,那我还可以。停掉之后以更高的并行度重新运行,哎,这些都是可行的,这就是flink非常灵活的地方,并行度的设置和它的优先级别。
我来说两句