00:00
讲到这个并行度设置的话,我们这里边就可以做更多的测试了,可以各种更改并行度,可以在这个代码里边,呃,把不同算子的这个并行度做一个做一个设置啊,也可以这个全局啊,提交的时候改不同的并行度去做这个设置,这都是可以的。我们这里要给大家讲的是打如。我这里提交之前这个打包的时候,它里边没有任何配置,对吧,没有任何配置的话,那当然就是以这里的这个精度为准,如果这里也没有给的话,收plan大家知道这是以什么,以集群为准吧,以集群配置文件里面那个默认的一为准嘛,所以并行度就是一。然后这里有一个问题,大家看画出来的这个dag啊,大家看这个任务当前的这个数据流图,我们会看到它并不是每一个我们认为的那个算子就是一个单独的方框,因为大家感觉这个好像就是呃,一个方框就是一个任务啊,提交的时候大家也发现了,如果并行度设一的话,确实最终一共就是两个任务嘛啊,那那大家会看到这一个方框就是一个任务啊。
01:03
那这个东西为什么他不以每一个算子作为一个任务去提交呢?哎,这里就涉及到一个问题,我我们看到中间有一个箭头把两个不同的算子联合在一起了,好像好像把它们连接在一起了,所以这样的话,算子和最终的任务就不再是一一对应的关系了,呃,就是抛开我们所说的那个并行子任务啊,就即使是不考虑并行,它也不是这个一一对应的关系了,有些前后的这个算子可以合并在一起。这又是怎么回事呢?啊,要要理解这个话题,而且大家看到这是一的情况是吧。我们再再换一个来测一下啊,因为这个不用提交,我们直接给不同的参数就可以解析这个目前这个任务长什么样是吧。大家看,如果给二的话。哎,这里边这个图就又变了。啊,这里边这个前面S还是一,这里要解释一下,之前也提到过啊,这个S如果是socket文本流的话,你不论并行度设几,它的并行度都是一,这是因为它这个底层实现啊决定的,它本身就不能并行,它本身就是一个串行的,串行的这样的一个文本K的流啊,所以这里边你就设置什么都没用啊,那所以如果加上这一条的话,前面我们讲的那个优先级别,那最高优先级,其实它的那个代码的底层实现,如果底层实现就不能设,你即使在后边给他设了这个set para也没用了啊啊就是也也不会调大了,无论如何它都是一,然后后边的呢,那默认非代码里边没设嘛,这边以提交时候的杠P参数为准,那么就并行度都是二,这个时候大家看变成了三个方框。
02:42
然后后边的两个方框并行的都是二,所以拆开之后它应该有两个并行子任务,所以我们回忆起来提交之后是不是一共就有五个任务啊啊,所以呃,这个他他行为怎么会这么诡异呢?他为什么之前这个S和flat map是可以合在一起的,现在怎么又不能合了呢?
03:02
要解决这些问题,接下来我们首先要解释一下。数据传输形式啊,其实这个概念也并不陌生,因为给大家说一下的话,大家就会联系起来之前这个Spark啊,包括loop里边的一些概念,呃,就是在这个程序里边,首先不同的算子它可能有不同的并行度。然后呢,不同的算子之间,它要做大家知道前后发生的这一个。呃,前前后定义的算子,相当于就是我们执行任务有有先有后嘛,那么一个数据在前面的任务执行完毕之后,肯定就应该要传递到下一个任务里边去做执行,那数据在不同的任务之间传递的形式。有可能就有两种情况。哎,是哪两种呢?一种叫做one to one,或者叫做直传直通forward forwardly,也可以叫,另外一种叫做redistributing,就是重新分配。
04:01
这两种形式表示什么含义呢?其实简单来讲就是说。按我们之前的那个观点来看的话,大家就非常明显了啊,比方说这是第一步操作就是一个S。这是第二步操作,这是一个flat map。我现在既然一步操作一个算子下边有两个方框,那自然就是表示它有两个并行子任务了,这是设置了并行度的啊,那大家自然就想到了,我如果要是数据要往下传递的话,那是不是有可能。我当前这两个任务就都在一个机器上,甚至我有可能就在一个,就是共同占占据了一个资源,然后我直接就在本地去执行了啊,那大家想这是不是就是一个直通模式啊。直接就在本地继续执行不就完了吗?啊,所有的甚至所有的这个数据都不用动啊,就就就直接拿出来啊,内存里面拿出来用就可以了,这就是所谓的one two one,也就是forward。
05:00
那另外呢,有可能我会跨区对不对,哎,我有可能会分配到其他的这个内存空间里面去,或者还有可能会传递到不同的task manager里面去,还涉及到网络传输了。那这种情况的话,这当然就是重分区重分配了,Redri beauty啊,所以简单来讲的话,呃,到底哪些算子之间是one to one,哪些又是redistributing呢?这其实有两个。两个影响因素。一个。是本身段子,它自己这个操作的概念,它的含义,比方说就是说像这个S和map啊,大家知道S和map一个是读取数据源,然后读取来之后做map,大家知道就是针对每一个数据直接做转换嘛,那这个直接做转换当然就没有必要再把它扔到别的分区去,对不对,那你直接就在本地做不就完了吗?只跟他自当前一个数据有关系嘛,不用去做任何的重分区的操作,所以当前的流里边就会维护着分区以及元素的顺序。
06:05
也就是说,如果后边要做一个map的话,之前S算子处理完的那个数据直接拿过来用,直接拿过来处理就可以了,这样的话SS一个接一个处理后面map,也就是处理完一个拿过来一个继续去处理,保持原有的顺序。这样的操作。他们两者之间。它们的关系就叫做one to one。那具体最后传输是不是one twoone呢?呃,其实这个还不一定啊,因为大家想到即使是S和map之间。后面我们说了啊,前面这是一个S,后边是一个map,假如S这里是一,Map是二的话,精度啊。Map是二的话,那它难道还是我就只在本地一个分区里边去去传递数据吗?那你后面这个并行图不就没用了吗?所有的数据有一个分区就完全用不上。
07:00
所以在这种情况下,他会怎么样呢?他会把S任务前面这一个分区处理完的所有数据平均的分配到下游的多个并行子任务上面,所以它会重新分区。啊,那这个这种方式就又变成了所谓的redistribu,就是重分区对吧?啊,那呃,重分区呢,它本身的概念就是说它的分区会会发生改变,它其实有两种情况,一种情况就是像前面我们说的啊,前面我们说的这个就是并行发生改变的时候,它当然就会引发它的重分区,那另外一种情况是什么呢?另外一种情况就是算子本身之间。他们就会引起重犯去操作,比如说什么,比如说K。前面我们说过KPI是什么?KPI是按键分组啊,那既然是按键分组,那是不是同一组的数据,我应该要把它发送到。如果要做一个K。
08:01
对,然后后面做一个some,大家知道。这两个合起来,这是一个。这是一个算子啊,假如前面是一个map操作,Map完了之后,他要做一个KBY按键分组,那是不是同一组的数据应该放到同一个分区里啊,诶,那所以这个没准啊,我来了一个一个数据有可能是这一组的我放到这儿了,那再来一个数据,有可能我应该分到下一组啊,那是不是它就有可能即使并行度相同,也有可能会出现重分区的情况啊。哎,所以redribu这种情况有两种,一种就是前后算子之间,它本身就产生了像KBY这样的重分区操作,另外一种就是并行度发生了改变,也会出现重分区,所以大家就看到了,这不就相当于沙否吗?啊,这就自然就联想起来了,跟这个Spark里边的杀父非常的类似啊,那如果要是对应的话,One to one,这个就很像我们之前Spark里边所谓的窄依赖的关系嘛,啊,那如果要是呃,这个redribu的话,这就相当于是宽依赖。
09:07
这个联联系起来的话就会比较好理解啊。有了这个基本的概念,有了数据传输形式的概念之后,那自然我们再来看这里这个图合并的这个效果大家就知道了。怎么样的操作可以直接合并在一起呢?啊,最终的结论其实就是啊,它的传输方式是直传的方式,最终是直传方式,那么就可以直接合并在一起,所以本质上来讲就是。算子之间是one to one的这种关系,而且并行度还相同,这样的话它是不是就可以直传啊,诶,所以大家看前面这个。如果是一的时候。到此和Fla map就符合这样的条件,我们说直接读取数据源进来之后,然后到Fla map,它它直接针对当前数据去做转换就完了嘛,所以他们并行度又相同,都是一,那么就直接可以直传,不涉及到重分区,它俩就可以合并在一起,那后边同样啊,这个聚合之后的结果,再去做输出的时候,你来一个输出一个不就完了吗?哎,并行路由都是一,不涉及到重分区,所以直接合并在一起。
10:17
而flat map和后边的这个K之后的some kid。之间他们会根据K去做一个重分区啊,尽管大家说这不对呀,你这里边这个并行度是一啊,怎么怎么它还会重分区呢?啊,这主要是跟它的这个算子类型有关,算子类型它们之间就不是一个one to one的传输的方式,所以这里边它们就不能合并。直接要断开,大家看到这里写了一个哈希,为什么叫哈希呢?因为KBY,它是基于K的哈希code去做了一个重分区啊,这个稍后我们讲API的时候会给大家说啊啊,所以这里面写了一个哈希。这就是关于这个合并。
11:02
不同的这个算子合并起来的这个概念大家知道,那在在这个并行度改二的时候,为什么它就多了一个方框呢?后边这个还是一样啊,聚合之后的输出它俩可以合并,然后中间呢,因为做了KBY,哎,这这不是一个one to one的传输方式,所以说一定要断开,那前边的S和flat map为什么要断开呢?因为并行度不一样。数据要重新分配了,所以它是一个这个叫做re balance,它叫做重平衡再平衡啊啊,这也是不同的这种重分区方式,所以出现这种情况的时候,他就必须拆成不同的任务。所以总结一下哎,Link里边这种方式啊,叫做合并钻练叫做operator check啊,有时候也会把这个叫做任务链啊,就但是一般从直观英文上去翻译的话,Operator chain翻译成算子链可能会更加的合理一些啊,所以这里我们叫算子链,那算子链到底应该怎么合并,那就是。
12:06
前后发生的两个算子连接起来,有这个数据传传输关系的两个算子,如果本身他们的操作是one to one的操作,而且并行度又相同的话,这个时候就可以直接合并在一起,所以大家可以看到这里这个图啊,啊,就是假如说我们这里这个S和map它可以合并吗?当然可以合并,因为他们本身是one图one操作,并行度又都是二,那可以合并在一起,而后边map和这个KBY之后开窗,这个可以吗?不可以,KBY大家知道它有重分区操作,对吧?它俩之间的这个数据传输本身就不是one to one了,所以这里边就必须要断开,然后这个经过窗口操作之后,然后再到输出thinkk,他们可以合并吗?不行,因为并行度发生了改变,又有重分去调整了,这里也是必须断开。
13:02
所以最终。大家可以看到啊,就是我们是并行度相同的one to one前后发生的one to one操作,我们就可以用一个叫做算子链或者叫任务链的一个优化技术,把它链接在一起,形成一个大的任务,一个task。最终的。Task啊,那那最终原先的每一个算子就变成了里边的一部分了。所以大家总结起来的话,那就变成并行度相同,并且是one to one操作的的话,就可以合并算子链,而且这两个条件是缺一不可的。那这样做的好处是什么呢?啊,好处就是说我们可以减少开销嘛,那你想如果把它合并成一个任务的话,那是不是就可以直接分配同一个资源就完事了,那那分配同一个资源,那就所有的都在同一块内存里边操作嘛,我就不用再去做数据的传输,不用copy到别的内存里边,呃,特别是更麻烦的是,你有可能还要做网络传输,传到别的task manager,就避免了这个杀的过程,这当然是一个非常显著非常明显的优化。
14:09
开销就会变得更低,所以最终的效果前面我们合并起来之后,大家就会发现了啊,Source和map可以合并在一起,得到了一个合并,到了最后这是一个任务,然后它并行度是二的话,那就有两个这样的并行子任。然后后边的这个K之后开窗做窗口操作,这是两个并行的子任务,Think是一个任务,所以总共我们前面四步操作啊,如果前面并行度设二,最后一个并行度设一的话,最后效果就是合并成了五个任务。也可以看多代码最后的效果底是什么样的一个结果啊,也是到的是后应该是五个任务,对不对啊,跟这个有点类似啊,只不过它现在我们这是第一个SS任务是一个,后边是两个。
我来说两句