00:00
那接下来我们看完另一个读取小文件的一个优化,也就是说咱们在读取数据或读取表的时候,数据源本身有很多的小文件,那咱们怎么样进行一个读取小文件优化?咱们可以回顾一下have have的话,它是不是有一个默认的输入格式啊,是一个combine input format,那么基于这种输入格式,它会自动将多个小文件读取到同一个map任务里面去,那咱们这边是Spark,能不能实现呢?啊,有几个参数可以调整,那第一个参数叫。呃,每个分区最大的字节数默认是128兆,也就是说呀,我一个分区里面最多,呃,能。有多少多大的数据,也就是说,比如说你小文件有很多个呀,那它可以支持读到同一个分区,那么最多一个分区多大呢?啊,默认就是128啊,就这样子把多个小面合并到128,读到一个分区里面去,这是最大值达到128,还有一个是它特有的叫文件开销。
01:07
啊,就你打开这个文件的话,它的一个开销默认是四兆啊,那如。这两个参数就可以来搞定咱们一个小文件的一个问题,那么大家可以看一下咱们这个代码里面map这个包下面有一个small fire ten这边我对这两个参数进行了指定,文件的开销的话,现在呢,咱们还是给的四兆。还是默认值,另外这个最大的,咱们为了直观的看到一个效果呢,我把它本来是128,我设成12兆,咱们来做一个观察,那我们读取的表是一张csc,这张表我们来瞅一眼他的HDFS。来,我们进到库的路径下面,Csc是这张表对吧?点进它的分区路径下面,那么大家可以看到这张表,原先呢,就有很多很多小文件。
02:06
对吧。可以可以当成是有很多小文件。呃,有大三十个对吧。38个。那还有第二页。好,那我们看看我们读取进来之后有怎么样一个效果。那我们先执行一下,然后呢,就这两个参数,我会从源码的角度给大家做一个分析,特别是这个文件开销,它到底有什么作用。啊,这边还是有些不一样的,那么如果你在网上搜一些资料的话,他可能会告诉你,呃,他说这个呢,就是呃,合并小文件的阈值,也就是说他认为啊,文件大小小于这个值的话,可以被合并到同一个分区,这个说法呢是错误啊,咱们一会儿呢,从源码来看好吧。
03:00
所以如果你按照网上的一个说法去验证的话,你是看不到效果,那是因为理解本身就出现了一个偏差。好,废话少说,我们先把这个程序提交执行,然后看一下再来梳理源码。好,等待他执行完,咱们去看一下他的一个分区数,分区情况,原本呢,是不是有38个对吧,咱们刚才已经看到了。那我们程序执行完了,我们先去看一下它的分区情况啊,刷新找到咱们这个small fire turning点进来,再点进来,那其实大家可以发现它只有一个stage,因为咱们就是代码很简单,读一张表,然后写到另一张表,那这个时候大家看到它的task数量是32个,那么原先呢,它的一个。文件数应该是38个,那至少有一定的减少对吧,那我们再看看为什么减少这么少。
04:07
如果按照网上的说法,那个文件开销默认四兆的话,是将小于四兆的文件加入到咱们的合并到一个分区的话,那这边这些六兆肯定就不行了。那再往后看,后面咱们有一些三兆多的文件,那这些讲道理是不是应该可以合并到一起啊,对吧,而且合并到12兆对不对,那应该是三个合一起,三个合一起,三个合一起。对吧,应该少个。六七个左右。那我们来分析一下这一个过程啊。那首先呢,咱们这个文件开销并不是那么简单的去理解的,那这边是我在源码截取的,如果你要看的话,直接搜这个类,那其实你从。
05:00
咱们的执行计划。啊,可以看到它底层是转成RDD的,是用了这个fire sc r DD对吧,那你可以一步步点进去,那我们直接看核心逻辑啊,就在这个地方。来我搜给大家看,CTRL加N搜一下这个类。那在这里呢,CTRL加F12啊有一个方法。那个叫什么来着,咱们直接搜啊,Create bucket。好,那这边它区分两种有有没有分桶,一个是有的,一个是没有的啊,那咱们这张肯定是没有好,那么进来之后呢,咱们看看它这边有计计算一个切片大小,最大切片大小,那这一块怎么算的呢?它的逻辑在这里点进来,那么点进来之后大家可以看到这里最终它的切片大小取的是这个。
06:08
是通过这个公式来取的,首先第一个叫默认的最大切片大小,这个就是咱们那个默认参数128啊,也就是这个参数值,就是这个参数值。默认128,那咱们是不是改成12了对吧,那我们再看一下,后面他还计算了两个东西,一个叫文件开销,一个叫每一个并行度处理的数据量,也就是数据分片的大小。那这个咱们默认是四兆。那这12兆它是不是。核心是算这个东西啊,那这个东西怎么来呢?咱们再往上看。就在上面这一行,它等于文件的总大小除以默认的并行度,那这个默认并行度呢,默认就是RDD的一个分区数啊,默认就RDD的分区数,那么。
07:04
这个呢,Total base,它并不是简简单单就是说总文件大小,它还有一个什么呢。那么大家可以看一下。他在取这个大小的时候啊。它是fires是把所有文件取到啊啊,然后把所有文件的总大小计算出来,这就是总文件大小,另外呢。是一个一个的文件,对吧,因为咱们map是对一个一个处理的,是一个文件,一个文件的大小,插上一个文件开销,那你想想如果你有十个文件。假设你每个文件是三兆,那么它计算出来这个total base应该是多少?是不是十个三兆乘以十,再加上每个文件打开都得加上一个开销,那就是四兆乘以十,所以它算出来的这个总大小应该就变成了70兆,而不是简简单单的30兆。
08:05
这个就是它总大小的一个计算,然后呢,取一个最大值。取一个最大值,咱们文件的总大小肯定是比这个大的,对吧,这是四兆嘛,这个肯定是大于四兆,除一个并行度,我的并行度呢?呃,那因为咱们那个直行应该可以算到,是不是三乘以二啊,是不是六个并行度啊。对吧,咱们是三乘以二嘛,咱们提交参数啊,六个变异度。那所以这个时候,呃,除出来的这个值应该还是大于四兆,也就是说这两个取最大它赢了,那取最小呢,这原本是128。那肯定是他赢了对吧,那咱们现在改成12,应该是这么来计算的。那我们再回退它得到一个最大切片大小。那后面呢,他就会去C盘文件了。
09:02
啊,这就是他的读取方式啊。那最后呢,他也就得到了一个分区数啊,根据这个最大切面数。然后封装成了一个fair skin r DD,对吧?所以呢,在下面我也给大家列出来了,呃,它的一个最大切片大小就等于这个值。那么计算这个的时候,每个文件都要加上一个open开销啊,另外是不用再加了啊,这我当时都写了,也就是说呀,咱们就是一个文件加一个open开销,一个文件加一个open开销,一直加加加加加完之后呢啊,这个没有要小于等于咱们那个128兆的那个最大单个分区字节数,对吧。这边的稍作修改,所以最终呢,咱们来判断依据应该是这个N个小文件的总大小,加上N个文件开销要小于等于最大的这个分级数啊,应该是这么来算的。
10:11
这个就很直观了啊,很直观,那所以咱们在调这俩参数的时候啊,哎,我这边案例是随便写的一个12兆对吧?啊,那如果你把它改成128兆,再去调整一下,然后呢,你也可以去调整这个文件开销,那这个文件开销调多大合适呢?这个参数注意啊,它是对于小文件过多的时候,那么它呢,有一个很好的效果。那文件开销咱们设置的可以接近于小文件的大小,这是最合适的。这个值设的太大的话,是不是合并不了太多小文件,大家可以想一下,比如说咱们最大是128,那我有N个小文件,每个小文件比如说就是四兆。啊,就是四兆,然后呢,你把这个开销设成了100兆,你想想它只能放几个文件进去。
11:04
对吧,你一个文件,那就是四加100,就就已经是呃104了,那你再读取一个小文件,再四加100。再加四,再加100,那是不是已经大于这个最大切片了,那当他发现大于最大切片大小的时候,就不会再继续读文件到这个分区里面了,也就是说这个时候这个分区其实只合并了两个小文件。啊,那这些开销又是无法避免的。那所以呢,咱们这个开销不适合设的过大啊,最好呢,就是接近小文件的大小,小文件大小,那么大家这个可以去反复测一下啊,这个可能一般没有人去关注,但这个确实是有用的,这个东西。那如果咱们要测呢,你可以把这个调大一点,对吧,比如说40兆,看他合并效果肯定不能很多去合并。
12:02
这是关于小文件合并,咱们从源码角度还有一个实际效果啊,去看一看,那刚才咱们那个案例呢,咱们再来调整参数试一下啊,那么现在呢,我做了一个啥事呢?我将咱们的这个文件开销改成了七兆。啊,这字节啊,改成了七兆,原来四兆,另外呢,我将这个最大分区数还是改成了128兆,那接下来我就直接简单一点在idea执行就好了,那么大家要知道现在我的线并行度应该是多少12,因为我的电脑是12线程的啊。那我在本地执行,瞅一瞅它的变化,并行度会变成多少,那这边我把死循环打开,我们看一下效果,那为了方便观察我。呃,当然我把并行度设成一了啊,咱们来再接下来观察一下。
13:02
对比一下不同的效果,多玩两遍。嗯。现在应该执行完了,那我们先看一下,呃,一个本地UI吧。咱们用lo host看一下,因为我是死循环,不会关掉,那么大家可以看到它一共只有一个三个了,三个task。那为啥是三个呢?我变形度是一啊,变形度是一来。大家看一下那。这大家可想而知,这个新的表应该有几个数据文件的,应该只有三个对吧,你看咱们的代码是写到这个test这张表的啊,Test这张表,那我们点进来刷新会发现原来可能是咱们是收到了32对吧,之前执行的那一次啊,那个参数比较小,那接下来你看这个呢。
14:05
是不是直接就变成了三个文件了,而且呢,这边合并后的文件大小才多大呢?才70多兆,他们明明最大值设的128对吧,但它到不了128,因为它要加上文件开销,结合咱们这个来考虑,这是128。那所有的咱们理想中是所有的文件大小小文件加起来达到128,再一个分区对吧,但是我们说了每一个文件都要加上一个文件开销。那这个我们也可以代入这个算一下啊,那我们回到源码的地方,呃。这里。首先计算一下最大的一个切片大小来进来,为什么?呃,咱们这个我瞅瞅咱们这张表多大啊,咱们通过这个地方应该是可以看的。
15:01
啊,这个circle这里执行计划,这里有信息统计嘛,啊,你看这张表。一共要读210兆,好吧,210兆好,咱们把它代入进来算一下,呃,那通过这里的总大小计算。210兆。那一共有几个文件呢?咱们应该有38个文件对吧,每个文件都要加上一个开销,咱们是七兆啊,这乘起来是多少啊。啊,反正是一个很大的值,对吧,为啥呢?那这么大的值是不是还要除以一个默认并行度啊,那咱们默认并行度是不是一啊对吧,你除掉之后你再来看。也就是说这个值等,呃,就就这个值,它就等于210兆加上38乘以七,因为咱们开销改成七的,然后再除以一,这个是一个很大的值,那取最大值是不是肯定是它呀,然后再跟这个默认最大128取什么取最小值,所以最终这个切片大小就变成了128兆,好,这个时候有的同学还有疑问对吧,咱们再回退。
16:21
回退完之后,他把这个最大切片数,他再往下走。往下走,他是不是要这个地方要计算,你得到一个计算分区数啊,啊,他还要带进来计算,那我们进来看一下,在这个方法里面,这里有做了一个事儿啊,咱们看这一行。啊,它首先定义的一个叫current size这么一个变量,默认是零啊,初始值是零,那么零之后呢,它每一个都会判断当前文件大小,加上新的这个文件大小有没有大于最大切片,也就咱们刚才计算是不是128。
17:03
如果已经超过它呢,那么就关闭,那这我们的疑问就在这里啊,那刚才明明合并到才70多兆啊,还没超过它怎么就关闭了呢?对吧,怎么一个分区没有达到128呢?你再往下看这个地方。咱们这个当前文件大小,每计算一个文件都会加上一个文件开销。这就能理解了吧,然后呢,再去加上新的一个文件。它是这么来计算的啊,啊,这个是Fi,那这个文件大小是这么来算的,核心就在这,咱们当前大小都会加上开销,所以也就是说咱们合并后的70多兆跟128的差距中间是不是咱们的文件开销,那我们算一下大概有几个相差呢?大概是50多对吧,每一个开箱是七,那是不是大概是七个文件盒一块。
18:00
对吧,对不对,中间不是差了五十来兆吗?那我们每个开销是七,这中间的损耗就是文件开销啊,咱们是填不满的,我们估算一下大概有七个,咱们平均每一个,呃,他就把它合并起来了啊。这个也就是为什么咱们合并后,这个地方它是填补板,咱们指定的最大值的填补板指定最大值,所以大家再思考一下,文件开销设得太大。好吗?对吧,包括网上的理解说,呃,这个值就表示啊,能把小于它的合并到一块儿去。这理解是错的对吧?这文件开销的作用是很多的,所以我们这个参数不能设的太大,开销那也不能设的太小,所以还是那个结论啊,这个文件开销最好设置的跟小文件大小差不多啊,那么至少能保证核掉一半以上啊答,这个合并后的文件大小会一半以上的块大小对吧?那这样就。
19:04
很好了,你看从刚才30多个,咱们优化到了三个啊,这个是需要给大家说明的,那我把这个源码这个地方也截进去了。呃。这个地方啊。也放到文档里,方便呢大家去观察,那咱们的核心逻辑在这里啊,这边是一个,另外呢,这是一个。好。那么大家注意这个事儿啊。
我来说两句