00:00
那接下来呢,我们来看一下另一种产生小文件的场景,也就是咱们动态分区插入的时候产生的小文见,这个应该说也是比较常见的,那这边我画了一张图,我们一起看一下啊。呃,那动态分区插入产生小文件呢,我分为两种情况,第一种呢,咱们没有经过否直接动态写入,那么下面是第二种,有shuffle再写入,我们来分别看一下啊,那么如果是没有沙uffle,比如说咱们。选择的,那那原来那张表查询出来呢,需要三个task,那么最差的情况呢,就是每一个task包含了咱们所有分区的数据,比如说呃,咱们这边设置了四个分区对吧。啊,这个数字错了,这个不要紧啊,大家知道一下是2021。2021。好,那么这种场景。
01:01
呃,SPA运行起来三个task,每个task包含所有分区的数据,这个时候呢,就有一个问题了,TASK1对比如说1月1号这个分区进行写入,那这个时候它就产生了一个一个文件,那么TASK2在执行写入的时候,将1月1号的数据也写入了这个分区同那这个中间就执行了两次写入,对吧?那有两个文件,那第三个task一样的,在写一号1月1号分区的时候,它也会写入产生一个文件,那么大家想想,如果这个分区数也就这个task数足够多,最差的情况这边是不是每个分区下面有,呃,SPA有几个task,也就有几个分区,这边就会产生几个小文件,对吧?那同样的道理,那么在其他的分区也一样的,都会产生这么多一个文件,那这个时候我们产生的文件数等于多少呢?就等于咱们这。
02:01
里边的一个并行度,也就是task的数量。啊,乘以咱们这张要插入的表表的分区数。对吧,那这种场景下就有可能产生很多小文件了,如果咱大家想想咱们Spark circle的话,默认的。啊,没有杀Le,那倒不会啊,如果task数达到了好几百,那是每个分区下面都会产生几百个文件,对吧?这就是没有杀Le的时候产生小文件,那我们可以主动进行杀否,将同一个分区的数据先合并到一起去,那比如说下面这种。哎。我在动态插入之前先distribute by分区字段,那这个时候在Spark它会先进行一个shuffle,对吧?Shuffle完之后咱们再去执行insert逻辑,那这个时候一个分区的数据都在一个task里面,那么再写过去的话,最理想状态下,每一个表的分区下面只有一个数据文件了,那这样的话就能很有效的解决一个小文件问题。
03:10
啊,那比如说咱们这个场景啊,首先咱们比如说写了一个得是beauty by啊,然后分区对吧?啊分区分区字段啊,我写个PT好了啊,那这个时候呢,你看一号的数据。是不是都到一个分区来了,二号的数据都到一个分区来,三号的数据也到一个分区来,四号的分数据到一个分区,那么在这之后呢,我们再来执行一个insert,那就是表的一个分区只有一个文件,表的分区只有一个文件,那这样就很理想了,但是呢,往往在我们实际使用当中啊,还有一个问题咱们要考虑啊,咱们不能仅仅停留于这里,对吧,还没完呢,那比如说呀,一号的分区的数据量有呃。比如说有1亿条。
04:00
那二号分区可能只有100万。那这个可能只有三号只有200万,呃,四号分区只有150万啊,这数据条数啊,咱们简单来类比一下,那这个时候咱们在呃执行的task里面,是不是一号分区的task执行的时间会比其他task要长啊,其实这种也算是一种数据倾斜,对吧?那这个时候呢,我们还有一个进一步的解决方案,大家回想一下前面讲的解决数据倾斜的思路,咱们是不是可以拆分了。对吧,将数据量大的那个任务把它拆分出来,然后呢,打散就行了。打散,然后再执行一个写入,那这样的效率就高一点啊。那咱们这边的实现大家可以思考一下,无非就是这样嘛,啊,我们是不是可以通过where,哎,咱们看文档,文档我写了。咱们可以按照这种思路啊。
05:01
呃,他们是插入一张表叫A,这边是举一个DEMO对吧,然后再分区字段是AA啊,小A小A,那么我们把它拆分成两段。插入逻辑第一段呢,我们是过滤出来。A,不等于大的那个分区键,也就是说比如像刚才不等于一号对吧,那就是剩下的分区数据都相对比较均匀,然后去执行一个插入,另外呢,一个细节就distributed by分区字段,那这样的话就会主动产生shuffle,将同一个时间分区的数据啊放到一个task里面去,再执行写入,那这边就是一个分区,一个文件。那下面这一部分呢,我们是通过VR过滤,当分区字段里面啊,数据量比较大的分区单独拎出来啊,这个就单独拎出来了,拎出来之后咱们还要做一个事儿,打散,那么打散呢,咱们是给了一个随机数,然后看你要打散成几份,你乘以一个整形,然后转成int类型。
06:04
就行了,也就是说这个数据量比较大的分区啊,咱们指定为五的话,那就是最终它会打散成五份,然后写入呢,就生成五个文件,那这样的效率呢,也有保证了啊,咱们就是这么这种方式来实现的,那我们来看看代码。在咱们reduce里面有这么一个类啊,动态分区小文件问题的优化,呃,那这边咱们是也是执行一个插入对吧,我把它拆成了两段,第一段呢是非倾斜分区,那么由于咱们目前准备的数据只有一个分区,也就是是DT等于20190722DN呢等于web a没有其他分区的,那我们就假定就是这个分区有倾斜,那咱们circle该咋写呢?也很简单对吧,呃,查询出。咱们要的数据源的那张表,然后呢,过滤分区这边是不等于不等于那个大分区,那剩下的就是均匀的,然后呢,在distribute by分区字段,让它按照分区去做一个数据集合,接下来执行一个插入,当然这个咱们执行起来肯定没有效果啊,就是看也不是没效果看不到,因为呢,这一块本身就咱也没其他分区了啊。
07:21
那另一块呢,咱们就是把大的分区单独拎出来啊,单独拎出来你可以拆分成多个旧吧,假设你大分区有很多,你可以把每一个都拆出来也可以,怎么样大家一起打散就好了啊,放在也就是说总的拆分成两份也行啊,那这边呢,分区字段,咱们把它大的分区键啊。过滤出来,然后呢,Distribute by是一个随机数啊,那我们这边取了个巧,呃,随机数我们乘以五转成整形,那这样的话其实就相当于呃分成五份,咱们来做一个打赏。那这个数字呢,你可以根据你的情况来调整,对吧,比如说你想打散成十份,你这边写个十也行啊,那你想打散成20份,写个20也可以,这个要结合你的具体情况来来定的啊,然后呢,咱们执行一个插入啊,也是动态分区的方式啊,动态分区哦,那我们来瞅一眼效果啊。
08:22
那这个。本地执行就行了吧,算了,我们还是直接那个拿过来跑一遍吧。Op Mo Spark,然后把这个提交命令拷过来,提交之后呢,我们等他执行完看一下最新,呃,被插入那张表的分区下面生成了几个文件,对吧。那么执行完了,我们来观察一下啊,我们看一下HDFS,首先咱们看一下代码读取的是这张csc的这张表,我们先看一下原先这张表。
09:01
其只有一个分区,只有一个分区对吧,那这边的文件数呢,38个。38个。当然这个跟咱们这个没有太大关系,那我们写入的表叫动态csc对吧,这是咱们起的一个表明,那点进来。我们现在演示只能有各有一个分区而已,因为没有准备太多的数据点进来,我们看一下文件数。对吧,按照咱们的设定,这个就是咱们发现的那个数据量较多的分区,那动态分区插入的时候,对于这个分区的处理,咱们就是将它打散,而且我当时打散的是五份,对吧?取了一个随机数乘以五,然后转成int。那这边就写,写出的时候就只会写出五个文件,那么假设咱们有其他分区的数据的话,是不是各个分区有只有一个文件了,那这样的话,整体咱们就解决了动态分区插入产生小文件的问题,可以说是非常高效非常好使的,呃,一个方式跟方法啊。
我来说两句