00:00
好,那接下来我们看一个map端另外一个比较显著的优化,增大map溢血时输出流的buffer,那这个东西呢,其实就跟咱们沙手有关的,那么在最上面咱们放了一张,呃,这个总结出来的一个沙否流程图,那我们放到下面一起来讲啊,咱们结合源码源码来给大家分析一下啊,那首先咱们这个salt沙。那么他在易写的过程中,会先写到一个内存缓冲区,那么大家注意咱们要调的参数不是它啊,网上你如果你查一些技术帖,他说这个就是这个map端的。缓冲区的话,内存缓冲,那这其实是错误的,甚至他理解为这个是一个溢血的预值,说调大这个值呢,可以减少溢解的,减少那个溢写次数,这个理解完全是错误的,那么也会咱们也会从源码角度给大家分析分析为啥是错误的,那么在接下来呢。
01:02
咱们来看看。那这边其实是这样,先说结论啊,首先这个map端溢写的这个内存缓冲啊,它默认是五兆的大小,然后呢,它其实是会自动扩容的,也就是说它如果咱们是往这个内存存的数据超过了五兆,它判断到之后它会尝试着对五兆进行扩容,那扩容到当前使使用内存的两倍啊,比如说你现在内存占用已经达到了六兆大于五兆,那么它会再去申请七兆的内存资源,也就是说要申请到当前内存的两倍,当然如果呢,咱们申请的这个内存申请不到了,那他就会判断哦,我该溢血了,他应该是这么一个过程啊,所以那一些说法是错误的,它并不能减少溢血次数,但我们这个输出流的buffer,它是能提高溢血效率,那这个怎么提升呢?其实它是在溢血的时候,它是通过输出流缓冲来写的啊,就是一个。
02:09
啊,Put string。啊,Buffer是dream up put的buffer string应该是通过这个东西来写的,也就是说其实你用到这个东西已经开始溢血的,你还怎么去减小溢血的次数呢,对吧?那么还有一个事儿,就是咱们一写到文件之后,它其实还要序列化的,它序列化的时候默认是按照1万条一批次去读,呃序序列化器是默认按照这个批次1万条啊,而且呢,这个1万跟咱们之前这个缓冲区的五兆是调不了的,对吧,它是无法调整的,那么这个东西呢,咱们从源码角度给大家讲一讲啊来第一个咱们找到那个沙否的那个源码。咱们简单看这就行啊,它其实是做这么一个判断,Maybepi这个条件是用这个方法是用来判断要不要进行一写,对吧,上面这里也有一个注释啊,说的明明白白了,在溢写之前判断一下需不需要溢写对吧?那你看前面是做了一个魔上32的比较,呃,是不是32的倍数啊,这个数据调数,而且呢,这个时候当前内存这个就是当前缓冲区已经使用了多少内存,那么跟咱们的这个阈值,它会去做一个比较判断,那在之后呢。
03:34
这个呢,它会计算出两倍的当前内存跟阈值的一个差值,你看是不是二乘以当前内存减掉阈值啊,这个默认是五兆嘛,对吧,然后这个他计算出得到一个他要去申请的内存量之后呢,他就调用了一个请求内存的方法得到了。申请后的内存,那么这边有个细节要注意,那如果按照他这么讲,岂不是无限扩容呢?哎,不是,关键就在于这个申请内存的方法,因为他申请可能申请不到,对吧,难道你说要多大一定就能给到你多大嘛,那就不一定了,正是由于此,所以他才会意见嘛,然后再往后呢,大家可以看到它会把这个阈值提高,你看。
04:19
把申请到的内存添加到这个阈值变量里面去,也就是说咱们举个实际例子啊,呃,当前内阈值是五兆,当前内存已经使用到了六兆,那不会D满一些啊,它满足这个条件,然后进来呢,尝试着去申请两倍当前内存的差值,也就是说这边是不是应该是二乘以六减五啊,是不是要申请七兆的内存啊,对吧?那如果他执行申请得到了七兆,他又把它七兆加到预值里面,那这个五是不是增长到12兆了,那是不是就不用一写了,对吧?它应该是这么一个动态申请的过程,咱们是从源码的角度来讲。
05:02
那么如果大家要看的话,我可以带大家点一遍啊,呃,应该是那个external sort sort啊,在这可以看啊,里面有个方法叫maybe spear。啊在这里啊,是这个这这个类里面最终调用到了这里,然后呢,这个就是咱们刚才截图图片看到那个参数啊,那个图片那个位置,那这个阈值带大家瞅一眼啊点进来,然后呢,这边它是通过这个复制的叫初始化的内存阈置再点。那你会发现它是一个获取的一个空一个配置项,再点进来。往下拉,那我们可以看到这里是有一个参数的。初始化的内存阈值,那么它的默认值呢?是这么多个字节的单位是字节,那字节去掉这不就K吗?再去掉不就兆嘛,所以我们说它的默认值就是一个五兆。
06:05
而且有个细节,这个变量它标记为什么internal internal就表示它无法被覆盖了,都要不可见的啊,点一下大家可以看到它的public属性是被置为false的啊,咱们是没法去把它做一个修改的,那你随便翻一翻,你看看其他的参数啊。像这种他们就没有那个internal啊,没有internal去指定另外一点,为什么说他不行,咱们也有证据,咱们打开Spark的官网,这个Spark的官网对吧,咱们点击文档,点击老版本,因为咱们看的是3.0.0对吧,版本是完全一致的,再点上右上角这个more,选择这个configuration。好,选中,那么在这里会有Spark所有的可可配置的配置项,那我们可以尝试着搜索这个配置项,或者我搜关键词,来CTRLF搜,你看你会发现这里并没有这个参数,这个参数是呃,你修改是不会生效的啊,它是不不会被修改的。
07:15
啊,这是第一个事儿啊,给大家说的就是,呃,所以一些技术帖,嗯,大家也不用盲目去相信对吧,你可以去找找源码确确认一下啊,应该这个才是内存缓冲区的初始阈值啊,这是五兆,那那个输出缓冲区在哪呢?它是在溢写的时候,你再往下翻一点,是不是有一个溢写的对吧?他判断哦,应该溢写的。应该预写的条件就在上面,这当前内存已经是大于阈值了,那既然会大于,说明就是它在自动扩容的时候已经申请不到内存了,对吧,无法再去申请了,那这个时候就要一写了,那我们看一下一写这个方法来我们点进来,呃,这边没有对吧,这应该是一个trade的一个特质,CTRL加H,咱们可以找这两个是。
08:08
两种使用的缓冲啊,对应两种缓冲,一种呢是做提前预聚合的,一种是不做预聚合的,如果是map类型,就是要做预聚合啊,就聚合,那这个是不聚合的,你看哪一个都行啊,他们的逻辑大差不差的啊呃,那比如说你咱们看简单一点的吧,看这个。这个里面应该也有一个SP方法对吧的实现。嗯,咱们找哪呢?找一找啊。Spear spear。
09:06
啊,在这里SP方法啊,收到了,那么这里我们可以看到啊。主要的核心在这里,它是不是生成了一个一写文件呢,对吧,那这里名字也很直白了。一些什么内存里面的数据,到磁盘里面去来点进来,那么点进来之后咱们瞅一瞅,它是用到了一个磁盘的快管理器啊,然后呢,这边创建了一个writer,就写出T对吧,这边有一个参数叫。Fire buffer size,文件输出流的大小,那这个参数是个重点,就是咱们所谓的那个输出缓冲区,就是它来点一下呃,获取参数对吧,再点进来这个就是这个参数是Spark shuffle fire buffer,那我们可以看到默认是KB对吧?它是多大呢?32K。
10:02
对吧。这个这是输出流缓冲区的大小啊,也就是说已经要往外写了,就像你一个,呃,要泄洪的时候,你你的缓冲区就是整个河道,它才是真正的缓冲区,对吧?那可能有一个水坝就是它的阈值,那接下来你要泄洪要排水,你比如说开了一个管子,要去把这些水排走。已经确定要排了,那你这个管子的大小是不是可以决定你这个排的效率啊,啊,所以这个缓冲区指的就是这个管子啊。缓冲输出流缓冲区啊,32K好,那我们回退。那他就得到了一个writer啊,那这个writer呢,在下面它会执行一个write的方法,来,我们直接往下看了。你看这里有一个right next,就是一条一条的去写,对吧,那写完之后呢,这中间还有一个小知识点,就是这里你看每写一条他都会怎么样呢。
11:10
都会计数啊,说我已经写出去一条了,这个这个变量是用来计数的,然后呢,如果这个数量等等于一个东西叫batch size,前面这个就是序列化器,如果等于序列化序列化器的大小,那么它就会执行一个什么刷写这个是flash到那个,这是由蓄炼化器啊flash到蓄炼化器。那就真正的写到磁盘去了,也就是说他开始写,写完之后是攒1万条,然后才真正的落盘成磁盘文件,你看。Flash那个磁盘的写出器,内容到哪里呢?到磁盘,然后。这就这个事儿,那这个批次大小给大家瞅一瞅它的参数啊,你点一下,呃,这个是一个赋值,再点,点完之后是这么一个参数啊,叫shufflepire batch size,它是默认是按照一万一批次去写的,那你看这个值是多少1万,那同样的它是一个internal,同学们,Internal就代表咱们刚才说了,无法被覆盖,无法被修改,它的public属性是一个false。
12:26
好,那这个参数呢,你同样可以去官网去搜Bach size在这搜啊,那这个不是沙佛be size吧,对吧,在搜这也不是,这也不是,这个也不是,也不是没有对吧,没有这个参数是无法去指定修改的啊,那这里面就是给大家要介绍的几个事儿,所以呢,如果你看到一些技术帖说增大map一个。Buffer,然后呢,说可以减少溢血次数啊这种它然后把这个buffer当成是。
13:04
把这个buffer当成是这一块缓冲区的话,那真的是大错特错了,这一块缓冲区我们知道,来我们一起总结一下啊,一个参数首先有三个啊,第一个map端的buffer,也就是他的缓冲区是这个。缓冲区有两种,第一种呢就是支持呃,聚合的,第二种呢是不聚合的对吧,对应咱们刚才看到的两个力啊,那这个默认大小都是五兆。那么。它的溢写条件呢,就是当前内存大于这个阈值,但是呢,大于的时候,它会尝试申请内存,申请到当前内存的两倍对吧,那么当他申请不到内存,当前内存还是大于增加后的这个阈值的时候,它就要进行溢写,那么溢写的时候,它是通过一个叫输出流缓冲区啊输出流缓冲这种方式来写的,那么写的话,它默认的这个输出缓冲的大小是32K啊去写,然后呢,它是积攒到1万条才会刷写到这个磁盘的文件。
14:13
就是一写文件啊,也就输出缓冲,它不断的写啊写啊写,然后呢,写到了1万条就刷,写到了磁盘上面,应该是这么一个过程,那其中咱们可以调的参数就是中间这个输出流的缓冲,那像原始。那呃,缓冲阈值的五兆跟这个批次的1万条,咱们是无法去调整的,无法去调整好,这个是咱们从源码的角度深入给大家分析和讲解,对吧?啊也不是瞎说,所以咱们学习知识点呢,还是得有自己的判断啊,那么咱们上硅谷对这些源码每个框架呢,也都会去慢慢的做一个梳理啊,那目前呢,也有很多了,那咱们一定是实事求是基于源码来说的。那接下来我们就要来考试一下这个提高的效率,那在我们的代码里面,Map文件夹有一个map fire buffer ten,那在这一块呢,这个还是咱们那个老代码啊,下面这个查询出三张表,呃,然后呢,三张表就起来就没了,还是这个逻辑啊,然后上面呢,咱们这个参数就对这个。
15:24
呃,这是并行度还是设成36,重点是这个参数,同学们32K,你K写不写都行啊。那么。我们呢?可以把它调成那个,比如说调成64对吧,它默认默认是32了,64,那不调的时候默认就32了,那这两个呢,其实是不生效的啊,这两个不生效,写在这是为了让大家去测试,去测试的啊,先做掉吧,也没用。
16:02
好,那么接下来呢,我就打包咱们执行一下啊。咱们跟前面的一个对比啊,这个没有关广播join,那不就跟咱们前面那个MAT join案例是一样的代码嘛,所以我们历史服务器做一个对比就好了。等待他跑完。好,那咱们把这把这个炸包拷过来。覆盖好,我这边是直接提交啊。就是咱们这个发八法啊,我之前考过一遍。好,现在执行完了,咱们直接来看一下历史服务器刷新啊,最新的这个点进来,那这个呢,三张表有一个map对吧,咱不用管它的,咱们看上面这个,那在这里是有沙否的。
17:16
有shuffle的话,大家可以看到有两个沙Le right,分别两张表的数据交换,还有一个最终表的数据读取,那我们这边从这张图可以看到,咱们优化的是这一块应该是right阶段。所以我们直接来看这个right的时间差距就可以了,我们找一个数据量大一点的来看吧,就看这个啊。1.5g的这个点进来往下拉呢,你可以从上面看这个图啊,这个黄色的就是。沙写的时间,这些你也可以看下面。下面这边有统计好的写的时间,那咱们呢,截个图,跟之前的这种做一个对比,截个图啊,那跟哪一个对比呢?跟咱们这个对比就行了,S啊不是那个。
18:12
哪呢?咱们是不是有一个map join呢?我记得我找找吧。Ski map啊对,这个可以。Ski ski ski map doing,好,我们看看这个逻辑是不是一样的啊。呃,你看这边也是查询出三张表对吧?呃,然后对它进行一个交,那上面的参数啥也没有啊,这个值是默认的十兆,并行度也是36,那是不是它的基本环境,除了刚才讲的几个参数之外,其他是不是都一样,所以咱们看这个任务就行了。KI对比一下它杀Le right时间,那也就是在这里了。
19:04
啊,那我们点一下还是看。上面这个下面这个广播不用看了点,那我们看到下面这里还是一样看到1.5个G的这个shuffle right,我们看一下它的时间,这个是没有调整buffer的时候啊,其实看这个图就知道,你看这么长。对吧,这么长它是效率被提升,来对比一下同学们。这是不是基本在一秒钟左右啊,一秒两秒,这边呢,甚至多的时候达到了五秒四秒对吧,这个就是咱们对一个shuffle right的一个性能提升啊。很直观啊,那么你要验证另一个事儿的话,就是这两个参数不起作用,现在是打开的对吧,而且我故意调大2万条,然后这个呢。这个阈值故意调的比五兆还大,对吧?就这两个不能改了,你可以把两个吐掉重新执行,再对比一下时间,那我就不去做了,因为我之前都做过了,没有提升,应该是跟咱们64这个是最重要的参数啊,这是一个演示跟说明,那么希望大家能从中呢啊学习到一些东西啊,也学会去翻页源码。
我来说两句