00:00
那首先我们来聊一聊第一种类型,单表数据倾斜优化,哎,就是说咱们处理的只有一张表,那这一张表要产生沙Le,肯定不是join呢,那就是什么呢?咱们会做了一个group by group by,然后比如说你再做一个聚合,因为grew by会产生一个shuffle,对吧?就那就可能产生一个倾斜的情况,还有如果是RDD呢,那咱们就是调用了类似group by key reduce by key AG get by key这种分组聚合算子啊,产生单表的一个shuffle,那这种其实不,呃严格来讲不太需要咱们去关注,因为咱们也讲呢,对于Spark而言shuffle的时候。是不是聚合的话,他是不是一个哈希aggregate,我们前面也看了那么多案例,都知道一般它是成对出现的,那么首先它会分割为一个is exchange,这个是不是代表shuffle,那前面有一个哈希aggregate,后面也有一个,前面看了很多对吧?那第一个我们得表示的是同一个分区内部的预聚盒,它本身会帮我们处理一个预聚盒,预聚合之王沙uff王在对重新分区的数据进行全局的聚合,啊,就真正的对这个每个T进行聚合,那这样的话,其实呢,基本上就不会有一个倾斜的问题了。
01:24
这个也是Spark,搜狗这块帮我们做了一些优化处理啊,这个也是非常好用的,那但是呢,有一种场景还是不够好使。预聚合是一可以一定程度解一定程度解决倾斜的问题的,包括hi,咱们使用hi的时候也一样,它也有一个叫提前combine,但是如果咱们倾斜的key大量分布在不同的map端。那可能就效果没有那么明显了,举一个例子啊,咱们就以那个101 103是清洁key对吧,就比如说咱们以101为例。如果咱们一共只有两个map端对吧?那比如说这里存在了大量101,这里也存在了大量的101,那其他的像什么幺零几,幺零几那个就无所谓了,都只有一两条对吧?那比如说这里有很多条,有20万条,这里有30万条,其他都只有,其他的key只有一条。
02:21
那正常来讲,如果不提前预聚合,那么这个101肯定要进入同一个分区,对不对,同一个task,那这边下游的这个就要一次性处理,比如说50万条,那这样其他的分区,比如说幺零不管幺零几啊,其他的可能处理啊几条就行了,那这个就是一种倾斜啊。那提前预聚合他帮我们怎么处理的呢?哎,他会首先在map端这里先自己进行聚合,这边他就会得到诶101,比如说有20万啊,这样子就变成一条聚合结果了,那这边同样变成101,比如说30万啊,那下游的这个分区,它就只需要读取这两条数据。
03:08
把它相加,那就得到幺零幺五十万这么一个聚合,这个就是提前预聚合的好处啊好处。那我们刚才也说了,如果你是大量分布在N个map端。那其实这个效果有限,你想想每一个map是不是咱们预聚合,是不是只能预聚合同一个map内部的。对吧,也就是说我这个map的数量足够多的话,每一个都得产出一条,那最终你在进行汇总的时候,还是得汇总很多条数据,但相对来讲也还好啊,像这个时候咱们就呃,如果真的有这种场景啊,大量分布在多个不同的那个分区在上游的时候,那我们可以自己实现二次聚合,二次聚合这个思想也是解决数据倾斜非常经典的一个思路啊,先加一个随机数。
04:06
进行一次聚合,然后呢,再去打伞。呃,咱们就画张图啊,就比如说还是那个101,咱们随便画几个,这里有101,这里有101,这里有101,好,现在咱们也不管玉具盒了,如果咱们一次性处理是不是要拉,比如说50万条数据过来啊,这样压力比较大,对吧?当然50万不算大,咱们只是举个例子啊,你就假设它是比较大啊,那二次聚合的思想很简单,我聚合的时候将101打散成几个组,比如说我打散成三个组,我加随机数。那就会变成有101_一,下划线2101_三,就把key拼接成这样,那这样的话,咱们去group的话,它就变成了不同的分组了,那比如说呃,被随机打散到下划线一的有13万条,下划线二的有20万条,下划线三的有那个。
05:02
20。7万条对吧?啊,或者再再再给个20对吧,然后这个给个30,那是不是在这三个地方他各自聚合一下得到一条结果,然后呢,咱们再对后面这个随机数把它去掉,去掉之后也就是咱们真正原来K的样子再进行聚合,那这边是不是只需要聚合这三条结果就可以了。那我们就能得到幺零幺五十这么一个统计结果,这个就是二次聚合的实现,那这边怎么写呢。咱们代码里有对吧?啊,我是通过自定义udf的方式来实现加速艺术的啊,因为这样方便一点,用RDD去写也是可以的,来,嗯,那打开咱们的这个类啊,在咱们给大家准备的ski这个包下面有一个ski。这就是单表聚合的情况,单表倾斜。那我写了两个circle,一个是简简单单的做了一个group by。
06:07
做了一个sum啊,这个就是没有二次聚合,那上面这个呢,就是咱们做了一个二次聚合打散的一个实现代码,对吧,大家也可以做一个参考,那我们首先呢,我也不打包了,因为我要快速的对比啊,我打开一个logo星,然后呢,下面circle我执行CIRCLE2,也就是说咱们现在只跑它就行了,那这张表咱们通通过咱们刚才的抽样,咱们也知道cos ID是不是有两个T特别大呀,一个叫101,一个叫103,对吧,但是呢,咱们也说了。呃,Spark磁口是不是会帮我们提前预聚合呀?所以可能效果没有那么明显,但多多少少能看出来啊。我们来执行一下,下面我加了一个死循环,就为了让任务不停止,咱们直接看四零,四零好执行。
07:06
啊,这个包装是页面还没好啊,你可以看一下对吧。这个不影响啊,我这面再刷一下就好了。好,现在OK了,现在在读取数据,读取数据等一会儿好了,呃,大家可以看到这个执行时间都非常的短啊,效果没那么明显啊,因为他会提前预聚合嘛,我们看找找能不能看到那个效果。这个也还好,不算特别的,那个倾斜效果没那么明显。看看这个明不明显。这个也不明显。
08:00
那之所以不明显呢?其实同学们也很简单这个道理。嗯。咱们看这个搜狗执行计划,咱们用这个数据量也不大,说实话啊,看执行计划在读取这扫描表列裁剪,然后这里是杀否,那么我们可以看到他在杀否的过程中啊,在杀否之前执行的一个哈希aggregate,也就这个就是提前预聚合,所以已经提前帮我们聚合掉了,那后面这个呢,是不是杀uff否完之后再真正聚合了,对吧?就。他其实自动帮我们做了这个,所以一般场景咱们不用去做啊,不用去做,那我重新执行一下,看有没有效果明显一点,看得到最好,对吧,等一会儿。因为咱们这个数据集比较简单啊。
09:03
而且加上b circle本身帮我们做提前预聚合,所以倾斜呢,不会那么明显啊,还是这个毫秒级,那估计没什么戏了。那呃,咱们简单看一看嘛,从倾斜呢,咱们可以从几个地方来看啊,第一个就是这个长条。比如说这个是个长条,其他都是短短的,其他都短短的,那可以说明有一个发生了倾斜,另外一个呢,咱们可以看看这个shuffle瑞的这些数据量。如果这个数据量非常的不均匀呢,现在比较均匀对吧,因为有提前预聚合了,如果不均匀你从这儿也能看到啊,有些比如说明显特别大,有些比较小这样。从这些地方咱们都能发现。好,那接下来我们看看怎么解决好吧,啊,咱们看一下这个,如果说真的就是提前预聚合也不行的。
10:05
那怎么办呢?对吧,咱们执行SQ1,那这一块呢,我是先定义的两个函数。那这边逻辑很简单,这个叫添加随机前缀,那这边就采取随机数啊,取一个整形。那这个整形范围就是咱们传进来的对吧,这个参数啊,然后呢,在拼接上原来的数啊,用一个下划线拼接,这就添加随机前缀,另外又定义了一个叫删除随机前缀,就是用split根据下划线切割取后面原来的值,那定义完方法,咱们想要用的话,必须在session里面。去注册对吧,对这两个分别进行了注册,那注册的函数名一个叫这个,一个叫这个好,这个是咱们资本背景,那么接下来呢,咱们瞅一瞅啊,第一步呢。啊,这是第一步,同学们,咱们查询这张表,然后呢,将cos ID这个字段拼接上到六的随机数。
11:09
对吧,打散成六份好,这是咱们第一步做的事儿,那么第二步呢,是这一块。这是第二步,第二步呢,咱们对这个拼接完随机数的key做一个group by,然后呢,进行了一次聚合,这是第一次聚合啊,这是第一次聚合。打散之后的聚合,那在接下来咱们看第三步。就是这一步啊,这第三步,第三步呢,咱们调用了一个去除随机前缀的方法啊,咱们自定义的那个,当咱们前面拼接的随机数去掉。那最后一步是这个。咱们是group by的真正的key看到没啊,这个才这才是咱们最原始的key,去掉随机数,然后呢,再对刚才取得的这个第一次聚合的这个字段呢,再聚合一次啊,再聚合一次,这就咱们的第四步。
12:14
这个就是二次打加随机数打散二次聚合的实现,那么其实这个circle口是不是跟那个have也很通用啊,对吧,都是一样的写法,来我们来看一下效果吧。但是由于咱们是封装了很多个子查询呢,那么对于咱们这么小的数据量,你会觉得执行时间变长了啊,因为job数呃,他这边子查询变多了嘛,但其实咱们如果数据量足够大的话,这它的作用就体现出来了,我们来感受一下。刷新一下local host页面啊,4040啊在读数据了。
13:04
稍等一会儿。好了,执行完了,那我们肯定是找后面这两个瞅一瞅,他跳过了很多作业,很多task。看一下这个。可以看到基本上,呃,看着也算均匀吧,啊这个。再看看上面这个。你看这个相对就均匀很多了,对不对啊。而且空这个读的数据量都接近20K。如果咱们的数据量还更大一点的话,效果就更明显了啊,就原来倾斜会明显一点,那现在呢,大家就。呃,感受一下怎么写啊,知道怎么来写,那一般呢,咱们如果是Spark circle,由于提前预计和帮我们解决了大部分场景呃的一个问题,那么其实咱们很少能用上这个方式来做了,就不太必要。
14:12
这是一个单表数据倾斜的一个情况。
我来说两句