00:00
好,那接下来我们看另外一种使用场景,这个也是咱们的终极大招了啊呃,就是呢,很适用于各种倾斜场景,首先如果酒瘾存在大气,咱们可以将大气拆分出来,然后呢分别进行处理。对于没有大T的部分,咱们就正常一个就有大T的部分,咱们进行一个打散加随机数,然后呢,小表进行一个扩容。这样就可以了,那么基本原理我给大家简单描述一下啊,比如说呀,咱们两张表进行交引,这边有N个一啊,有特别多啊,这是一个字段,叫做ID对吧?ID字段这是一张存在倾斜大K的表,那么另外一张表的ID它只有123啊,比如说它很均匀,它没有基本没有倾斜,那这个时候两表进行交引,咱们可以这么做啊,首先它左边这张表我叫A,右边叫B啊,我A呢,把它拆成两个表。
01:03
怎么拆呢,你可以过滤就行了呗,对吧,大家过滤就可以得到了,那么这一块呢,一个呢,就是咱们的。包含大K的,咱们叫它A1,拆成这张表,另一张表叫A2,那这个时候咱们就可以选择什么呢?A2跟B。做一个普正常的一个纠啊,那这一部分的任务是不存在倾斜啊,就这个跟他做一个纠引啊。那接下来A1单独处理,但是这边有个小细节,A1咱们要跟B来做一个join的话,还比较麻烦。因为呢,咱们需要对B做一个打散,因为A1咱们需要打散,对吧,比如说这个我拼拼接上一个随机数的前缀,那后缀都是一对吧,那比如说我打散成三份,那前缀就是123,那么这一块是咱们自己添加的随机数,也就是说。
02:04
看这么多的一个一,这个值打散成了三份,加了随机数,那这个时候咱们跟原来的另一张表B啊,原本是要跟1JOIN上的,那现在就交不上了,因为它有了随机数了,T的值变了,所以这个时候咱们需要对另一张表的T做一个扩容,对吧?怎么扩呢?也就是说咱们要每一个词。如果你前面拆成三份,那你这边同样的要扩大成三倍的数据量,原来三条要变成九条,举个例子,一的话,咱们就需要把它扩容成一杠一,一呃二杠一呃三杠一,你看如果我们进行扩容,就原本一条,现在变成扩容成了三条,那这样跟原本这边的。打散后的K是不是就能交上来呀?那同样的道理,二应该被拆分成一杠二,呃,二杠二,三杠二这样的。
03:00
那三呢,就被拆分成了一个一杠三,呃,二杠三,三杠三。那这样的话,这张表你看这样是不是就扩容呢,对吧,扩容了N倍,前面打散成N份,那后面就得扩容成N倍,这样是为了能够呢,就上那。你可能会觉得我对一扩容就行了,我过滤出来,那你想想咱们这里举的例子是倾斜的key只有一个,那如果咱们倾斜的key里面有多个呢?对吧,要是咱们的倾斜key有多个,那这边可能也有多个值都得打散啊,所以呢,如果数据量不大,你就正常把它扩容就行了,那之后呢,这个A1呃打散对吧?呃,我们叫散的A1就可以跟谁啊,跟扩容后的B就扩B,好吧,进行一个交引,那这边是不是就能够很均衡的不会倾斜的得到一个结果,那这两部分结果咱们对对它做一个union,那不就搞定了吗?对吧,这个就是实现的一个思路。另外呢,由于咱们对这个T值发生了改变,对于下面这个处理,咱们是不是还要去掉随机前缀啊?
04:15
对吧,咱们去掉随机值。然后再进行优点,那这样呢。咱们就可以解决这个倾斜的问题,那么这种思想跟方法是特别好使的啊,特别好使,那我们看一下代码的实现。在咱们代码里面有ski,下面有一个ski join turning,好,那这边大家看一下啊,呃,前面我们也是关闭了一个map join啊,其他没做啥事了啊来接下来我们读取出来这三张表,那么基于咱们前面抽样的结果,咱们知道这张表存在两个倾斜的key,一个是101,一个是103,那我们现在是不是要。把这张表拆分成两部分了,一部分是101103,一部分是其他的T对吧。
05:05
然后其他的key是不是跟这两张表正常做一个交易呢,那这个key呢。跟谁呢?这个大T是不是得打散呢?对吧,要对101103打散,那么打散之后呢,咱们对应跟他join的这张表,也就是张c Co,它里面的ID,咱们得给它做一个扩容,扩容之后再交引,对吧,再去掉随机前缀啊,然后呢,再把它优联起来,来我们看一下具体的过程,第一步咱们进行了一个拆分啊,对这张表啊,Csc做了一个Fi,那这边条件大家看一下,呃,它是不等于101,而且呢。啊,这边应该用或对吧,啊,写差了啊啊不小心改到了对吧,不等于101或不等于103,也就是说K部分是不包含倾斜key的,那这边我把它命名为一个空门啊。
06:05
那另一块呢,就是咱们的啥呢。等于大K对吧,等于101。啊,不是刚才改改改反了啊,上面是雨雨对吧。就是不等于101,也不等于103,那下面这个咱们就是等于101或等于103对吧?那这个时候咱们就下面这个就是存在倾斜大key的部分,那我把它起个名字叫key啊csc,也就是说咱们对这张倾斜的表啊,把它拆分成两部分,那接下来呢,咱们要对倾斜的这一部分把它打散,那比如说这边我选择打散成36分,因为咱们的并行度是36嘛,那我就想着一个并行度一个,那么这样处理效率会高一点。那我们是调用一个map partition,那之后呢,我取出这个cos ID啊,得到cos ID字段,那这里应该是有一个101,一个103,接下来呢,我取了一个随机数,是到36的一个整形随机,然后呢,我封装了新的数据类型啊,Csc的这个数据类型,然后最终添加一个字段,看到没?咱们前面构造的这个随机数,把它拼接在cos ID前面,这个就是加随机数的一个方式啊。
07:32
这是打伞,那我们同样的小表要进行扩容,怎么扩容呢?用selfforx,那由于是扩容一进多出对吧,咱们可以考虑用flat map啊一进多出,那下面这里呢,其实呃,那像这一块呢,故事呢。对咱们的数据做一个转换跟封装,那核心逻辑在下面这儿啊。那么大家注意,你看。我是从零到36来做一个循环,因为咱们前面打散成了36份,那这边是不是得扩扩大三16倍啊,那我这边直接你看我对这个I值是不是也是有36个值啊,拼接在cos ID的前面,那这样呢就能跟大表打散后的这个随机key啊,能够对应上,你要确保他们俩能交引上啊,要确保他们能交引上,那这边得到一张新的这张小表啊,新表好,那再往后看,大家看一看,分两块进行交易,第一块呢是咱们的这个。
08:38
扩容后的表跟谁呢?跟这一张打散后的表。做一个处理。啊,就咱们倾斜就101103打散之后啊,跟扩容后的小做一个呃,Join引,那我们注意了,那这边我join条件是什么呢。Join条件是这个我们拼接的随机的cos ID啊,根据这个打散的随机值的关,呃呃,Cos ID进行一个关联,那其他就不用动了。
09:08
那在之后呢,正常这一部分咱们也要进行一个join,那这一块呢,咱们就是你看现在我是没有扩容的表啊,这是没有扩容的表跟不包含倾斜key的拆分出来的这一部分进行一个正常的join,这个按照正常的cos ID join就可以了,那这个时候咱们就得到了这两块了,对吧。然后我们对它进行一个union,就是咱们想要的结果,然后呢,我把它写到这张那个csfor详详情表啊,那这中间大家可能细心的发现了,我这边呃,加上随机数之后啊,就上面这个东西,这这个结果好像没有做去随机数的操作是吧,但是这边我取了个巧,同学们我在封装这个数据类型的时候啊。你看我返回的这个类型,一个是CS,一个是这个csc,给大家看一下啊,你看前面这么多字段是这张表原有的字段,我处理的方式是在后面加了一个字段,在原表的基础上加了一个字段,这样我就不用再去做去随机数的处理了,我join的时候用这个字段去做join,但是我在select字段的时候,我只select正常的字段,那这样咱们就方便多了,这样效率会更高一点啊,也就是说咱们预留了一个字段来放随机值,呃,拼接随机数的,然后关联,用它做关联,那最后取结果集的时候只取正常的cos ID,那就行了嘛。
10:42
对吧,那同样的道理,我们看另一张表啊,扩容的这张表我也是这么处理的,上面这些是它原有的字段。啊,它本来字段就只有这么一些,那么在这个样例类里面,我又加了一个字段,叫拼接的随机数之后的cos ID,这个也是用来做join的时候做关联的,那之后呢,咱们再取值的时候,取这个值就是没有随机前缀的那个值了啊,这是取了个巧,所以咱们就不用再去单独说对它去掉随机数啊。
11:16
这也是咱们为了一呃省点事对吧?啊,然后这就是咱们的实现逻辑,那由于我之前这里改了一下,我重新打个包啊,这个条件啊,重新打个包。打个包之后咱们执行看一下那个效果。好,稍等一下啊。好,打完了,那我把这个新的包拖过来。放到这啊上传,然后呢,咱们直接拷贝。
12:01
来提交。提交上去,咱们等他执行完,来观察一下它的UI界面。好,那么执行完了,我们在历史服务器看到对吧。来,点进来,点进来。哎,卡了一下啊。我们可以瞅一眼D啊,我们这种做法会产生特别多的stage啊,那其实大家可以看到这是不是这两张表的就业呢。对吧,这两张表进入到了这里,然后做完成了一个join啊,这是第一个join,然后再往右拉。是不是又读取了一遍这两张表对吧,也就是说他们分开的,呃,拆分成大T,然后还有一个扩容的新表啊,就分批join,那这一块是完成第二个join,那之后呢,再拉到最后面,在这里呢,他完成了一个优点操作啊。
13:05
那我们往下看看,有杀否的阶段,就是join的时候了,那这边我们会发现杀手的过程是增多了。啊。因为咱们都用次数变多了,我们看看有没有倾斜就行。那第一个你看你从这个执行的时间上来讲,基本上差不多对吧,这一块大家可以看到,那么再往下走看一下,这个数据量也很均匀,对不对啊,都是八九兆。啊,沙佛写都八九兆,都比较均匀,读的时候也差不多,那我们再看看其他的沙佛,呃,再找36的就行,因为并行度是咱们设的沙否并行度36啊,那这一块呢,基本也差不多,但有个别小的,咱们也不不用担心,像这种情况完全是可以容忍的,整体来看没有这种出现一条特别大长条,然后特别小条,像这种巨大差异的啊,没有,我们也可以看看读取的数据量,基本也都是几兆对吧,有个别小一点。
14:02
这无所谓啊,那基本上就没有什么太大的问题了,再看看其他的啊,这边是这两个。优劣起来了。看一下这是优年起来,那这个基本上我们也可以认为是大差不差的,也是没有那种防伪对吧,没有这种情况,那我们可以看下面的数据量的情况,杀否情况。它读取是不是很均匀对不对,其实都是6.7兆啊,差不多,那从这呢,我们可以看到我们这种精巧妙的方式对数据倾斜做了一个处理,但是它的代价是什么?咱们的杀手次数增多了,但是每次杀否是比较均匀的,所以这种适合什么呢?那种比较顽固的倾斜啊,咱们麦酒瘾也解决不了,对吧?啊,其他方式也不行,那咱们只能选用这种分而置之的方式来处理它,那这些代价是值得的,当你的数据量大到一定程度的时候,不好解决的倾斜,那这种方式就能够很好解决,比如说原来你一个作业一跑大几个小时,比如说跑了六个小时才跑完,那可能你换用这种方式,可能两三个两到三个小时就OK了,对吧,这个代价是非常值得的。
15:19
甚至呢,时间更短,半个小时就跑完了,再甚至呢,可能几十几分钟就完事了,这个都是很正常的啊,那这种方式呢,大家也可以当做最后一最后一步的选择,当其他方式都不好使的时候,咱们用这种能够很完美的解决。那这边也给了大家一个现成的实现代码,对吧,大家都可以仿照着去写啊,仿大的去写,你也可以不写DAPI,你写个circleq也行啊,那这边是因为咱们直接用复用前面代码,所以没有去做修改好,另外一个咱们提一一句,就是这个咱们如果你是3.0以上版本的话,倾写像这种随机打散的方式的话,它是可以自动实现的,但这个需要依赖于3.0的AQE功能啊,那这个咱们到后面章节会专门介绍啊,现在就是留一个印象啊。
我来说两句