00:00
好,那我们看下一个K之前发生数据倾斜,也就是说也不管你后面要不要聚合了,就之前。比如说你做map的时候,已经是存在着倾斜,那这个是怎么发生的呢?很简单,咱们消费的数据源,比如说是卡不卡的话,比如说它有三个分区。那我们我们理想状态1:13个并行度对吧,啊一一消费。那这时候就有问题了。由于卡不卡,每个分区之间的数据均不均匀,这是不一定的,也就是说你生产者如果发送的时候,它的分区啊有有。有一定的逻辑,可能主要数据发往这两边,那这边呢,可能是比较少。那回头咱们来消费的时候,是不存在这这个数据量大,这个数据量大,这个数据量小,或者说就其中一个大啊,就是它大,其他都数据量很小,像这种情况是很可能的。另外还有一种情况就是什么呢?你不是1:1,就像咱们前面啊也提到过。
01:04
啊,我几个分区啊,但是呢,我只有两个SS的并行度。并行度等于二,那这时候是不是应该有一个要有一个并行实力,要消费两个分区啊,有一个消费一个分区,这种呢,也是倾斜了,那对于这种倾斜的处理是极其的简单,特别简单。咱们只需要使用一个重分区的算子就可以了。那从分区的算子,咱们是不是有八大分区器啊,那其中呢,咱们就可以用这么几个,用re balance或者效率更高的risk care risk care可以减少一些网络传输啊,性能的效率更高一点,Care啊,Re balancell,甚至呢,你想用杀否都可以啊,重新打乱啊,它分区器呢,是基于随机数的。这个大家应该都看过。这个咱们就没什么好聊的,咱们主要聊后面这个批之后的窗口聚合存在数据倾斜。
02:01
对吧,那相比于第一个的区别在哪,它多了一个窗口。那窗口我们说它的作用就是什么。无界的数据。也就将无界的流把它切割成一个一个有线的块,说白了像是像不像咱们长了一个皮呀?并且咱们窗口有一个好处是什么?是不是只有窗口触发计算的时候才会输出一条呃,计算结果。对吧,这样就避免了咱们之前分析的流逝二次聚合那个数据量没减少的问题,对吧。所以呢,这是非常方便的,我们来解决很直很直接二次聚合。那二次聚合就,呃,很简单,很常规,跟其他的计算引擎处理的思路是一样,第一阶段我将咱们的key拼接一个随机数的前缀或者后缀。对吧,按照这个随机的key我进行keep败是不是打散了,但是打散它就可能处于不同窗口,对吧,那我们开窗正常的聚合,聚合完之后咱们还是不是需要把打散过的那些再回呃,回头再聚合起来。
03:13
那这边咱们要注意的是什么呢。咱们窗口聚合的结果。呃,你你是不是因为这个窗口,是不是时间在在推进窗口,呃,越来越多,越来越多,对吧,你第一个窗口的结果是不是不应该跟后面窗口的结果,比如说你你一个滚动窗口,第一个是零到十秒。呃,第二个窗口紧接着是不是应该十到20秒,好,你零到十秒的数据啊,比如说那个G,你把它打散成三份,即使你再怎么打散,这些数据是不是仍然是属于零到十了。也就是说你不应该将这个窗口跟下一个窗口的数据啊,把它聚合起来,这是错的啊,应该把属于同一个窗口区间的给它合并起来。那所以呢,咱们开聚合完,其实就变成一条普通的流了,也就是说没了窗口的概念,所以呢,咱们需要什么将数据拼接上一个窗口的时间,窗口的标记,那一般呢,咱们用窗口结束时间啊作为一个标记。
04:15
之后呢,咱们只需要按照这个什么呢?原来的key和这个窗口提出时间key代,那原本的key跟同一个窗口数据是不是到一起去了,对吧?啊这样这个思路大概是这样子。思路呢,啊还是比较简单,咱们看一下代码。那我们看一下提交一个原始案例,然后我们直接来看代码啊,这个是一个DEMO2。提交好吧,抽代码来。让他先跑着啊。呃。看一下啊。其他关了。好,前面呢,同样是一个准备数据转换成二元组mid跟一这种形式,好,接下来是这么一个需求。
05:04
按照mid分组统计。每十秒个。Mid出现的次数,看到这个需求,咱们是不是要按照mid败之后呢,开一个十秒的滚动窗口啊,之后再做一个聚合统计就可以了,就这么简单一个需求。呃,那这边原始案例呢。咱们代码是不是就这么写,现在是一个二元组对吧,那么是不是T第一个元素,然后呢,开一个十秒滚动窗口。在之后呢,直接聚合输出,哎,这个是咱们实现这个需求最简单最直白的一个写法。那么看看这么写的,这个需求实现起来有什么问题?咱们照的数据里面mid是存在倾斜的。所以我们看看这个window啊window,呃。
06:00
Window聚合这里来,大家可以看到这里是不是有数据倾斜了。你看有一个子任务,它是50兆的,60兆了,其他才多少?三兆差了整整20倍。这个很明显有一个倾斜的问题。好。这是案例,那咱们怎么解决呢?怎么二次聚合呢,说那么多对吧,咱们来一个实际案例,大家注意啊,我现在是这里。这边做的。在这里呢,咱们首先是进行了一个map map干啥呢?拼接随机数啊。你看我把原来的K是不是F0。拼接上的这个分隔符,你随便定义对吧?啊,一个连接符,然后取了一个什么随机值。我把这个key给变了,变成这样子,原来只是一个mid。啊,那现在我拼接上随机数了啊。
07:01
那这个随机数随机的范围。随机的范围我是传参传进来的啊,就传了一个random number,这参数名自己起的,反正就这么来指定啊,然后没变,这是拼接随机数之后呢,击败按照咱们。这个方式来分组。Mid拼接上随机数的啊。做了一个分组,分组之后咱们正常开了一个窗口。在接下来reduce啊,这边就比较关键了,这边怎么聚合呢,那首先第一个。函数。是不是reduce function这个呢,是不是就两两相加对吧?哎,把它F1相加,就是把这个一这个值相加,T相同的相加,那第二个函数我们知道是不是一个全窗口函数对吧。像这种是支持传两个函数的,那么它的作用应该大家都了解,第一个函数是定义它怎么实现一个预聚合,也就reduce是不是来一条算一条,来一条算一条,来一条计算一条,那最终呃窗口触发的时候,它不会直接输出结果,而是什么呢?会将你聚合的结果。
08:11
再传入到这个全窗口函数,那么全窗口函数再处理完之后才真正的输出该窗口的结果。啊,就提到这么一个,这是这么一个用法,对吧,那后面这个全窗口函数,我就没有再写个类了,直接写个匿名实现类,那么大家看一看。我这个是T啊,不直接看逻辑吧。你看它的数据是一个迭代类型,对吧,因为全窗口函数默认呢,应该是整个窗口的数据一起处理,但我们这个用法这边聚合完的结果。同一个P是不是只有一条啊?对吧。那每次进入这个process方法的只会是同一个key的结果,对吧?嗯,那么接下来我就是迭代出一条,把它迭代出来就行,N,那就拿到了之前的聚合结果啊,就是上面的聚合结果。
09:06
然后呢,通过上下文取到窗口的结束时间,时间戳。好,现在你看我封装成一个TOP33元组,那么第一个呢,还是key啊,就是打散的那个key对吧。这第二一个是这个T对应的统计结果,第三个窗口结束时间,这个全窗口函数核心就做了一个事。把咱们窗口标记给携带上,回头呢,我就能根据窗口结束时间做K,这样是不是可以把同一个窗口区间的数据把它汇总起来了。好,就是咱们的第一次聚合,拼接随机数K聚合打标记啊,这做了事,那接下来我们看看第二次聚合。首先呢,咱们是不是得按照原来的key呀啊。这边你也可以不用写map,直接就行,但我呃,为了写完整一点,我就特意这么一步步写。
10:03
那我们看map里面做了什么事呢?我是不是根据这个分隔符做了一个切分,取到原来的key对吧?Key,然后再把它封装回去。原来的T。然后呢?统计值,还有窗口标记好,再接下来KBY按照什么keyy,这是不是原来的KF0对吧。原来的key还有一个按照窗口结束时间也就标记这样的话。就把原来本应应该属于同一个窗口,却被我们打散的结果可以汇总到一起了。之后呢,咱们再把这几个把它汇总一下reduce。再把它加起来,把这个值加起来就行了呗。对吧,统计值加起来就行了。然后就完事了,那以此呢,就实现了二次聚合,那我简单画个小图,咱们来理解一下呗,嗯,我用这个吧。
11:00
要不然没法保存。呃,比如说原来我的窗口。呃,咱们的mid,比如比如说是这样mid。啊,123。这是原来的mid。那我比如说我打散成的三份,那会变成什么呢。我打散成这样。通过咱们的代码,咱们我是用一个减号拼接对吧?啊拼接一个一。那这个就是拼接一个二。那这个呢,拼接一个三。啊,也不是三个零吧,咱们不是012吗。啊,其实这个无所谓的啊。是原来一个mid是不是会被我们它的数据,很多条数据都会被打散成三份,对吧,那原先呢,可能是只有一个窗口。
12:06
原先这个窗口是不是应该是呃,零,比如说十秒对吧,零到十的窗口,比如说原来统计结果应该有1万条。啊,或者。10万条吧。原来统计结果是10万,那我们说比如说它的倾斜对吧,那打散会变成什么呢。这个K它就变成也是一个什么呢?呃,零到十。他可能只有3万条。那下面这个是不是也一样。再下面也一样,对吧。那可能这个是4万条啊,随便给啊。原先是这么来统计,现在变成了三个窗口来统计,就打散成三份,但是呢,他们的时间区间是不是一样的,对吧,那这个窗口是不是会输出一个3万,这个输出一个4万,这个输出一个3万。
13:06
那这条数据咱们是还需要把它汇起来键,就怎么它汇起来,我们汇是个口,也就说属于零到十的才要汇总,你十到20的是不不应该汇总过来是吧,那所以咱们要批什么呢。击败原来的key,也就MID123,另外还有一个事儿,是不是击败十这个窗口结束时间,那这样这三条数据才能到一起去,才能汇总起来对吧。这就是咱们二次聚合要注意的一些小细节,就关于这个窗口结束时间标记的问题。是。呃,我们再提交一下新的这个啊。原先这个咱们看过了对吧?啊,这里是有倾斜的。截个图。然后把它取消。
14:00
嗯。咱们提交新的这个案例啊。好提交上去了,而且呢,大家可以看到我这边。后面插呢,开启两阶段提交,另外随机打散成16份啊。嗯。多了一个。看看。这边忘了这个。忘了这个。那现在这个任务就。不准确了。我把它停一下。少了一个这个杠啊,让它变成回车的。
15:01
我们再重新提交。好,刷新。等一会儿。有了。你看,这是我们第一次的窗口聚合。第一次的窗口聚合,咱把它打散之后再聚合,看一下接收的数据量。啊。好,我们首先看这一块呢。现在看起来是不是还是比较均匀的,因为咱们通过随机数将它打散的。你看现在都是两兆左右啊。两兆左右。对吧。相对呢?跟以前相比是不是均匀多了,以前差了大几十倍呀,那现在基本差不多,那你再看往后这里呢,应该是咱们。
16:06
去掉随机数,重新踢。重新reduce看一下,这里处理的数据量是不是很少啊?对吧。你看前面都接近十兆了,这边每一个才两兆,因为是提前聚合过的。这个从这个呢,咱们就能看到一个直观的一个效果。另外要提醒大家的一点就是。咱们这打散成几份合适?这边要注意这个随机数打散成几份,你要自己去测一下。因为K的分区逻辑并不是直接取哈希值的,咱们那个建它是有一个建组ID对吧,通过两次哈希得到一个建组ID,它会乘以下游的并行度,再除以最大并行度,默认128,它是通过这么来算的,那这么来算的话,你你这个打散不同的分数就。
17:04
影响就不一样了。来,自己去试一试。我是试出来我当前这个代码,这个资源16是比较合适。这个就是咱们的处理,那么circle怎么写呢?呃。并没有这种。随机数聚合,然后再二次聚合的没有,但咱们可以干嘛呢?看一下这个搜狗写法。思索写法很简单,这个呢,同样是谁呀啊,是张军大佬的啊。张军大佬写的好。你看啊,他是这边是不是开了一个滚动窗口啊,它是一分钟的啊,之后呢,做了一个。Count。对吧。啊,也是group by count,然后开了一个滚动窗口,那存在倾斜,其实案例跟咱们很相似啊,那你看他怎么改写的circle呢?很简单。
18:11
这是第一层。这是第一层在干什么事?加上一个随机数。你看他这边做了一个拼接。这是原来的分组字段,拼接一个连接符,拼接一个随机数。之后呢?这是第一步。第二步是不是啊,是不是正常开窗对吧,然后呢,按照拼接好的随机数。拼接随机数的这个新T,它作为一个分组,分组完之后它得获取到什么窗口结束时间,思路跟咱们一样吧,为什么呢?他是不是希望把同一窗口的数据汇总,你必须有一个窗口标记,对吧,窗口结束时间是比较常用的啊。另外呢,是不是先第一次聚合。
19:01
就第一次聚合,而且呢,同时还打上了标记好,再接下来第二次聚合干嘛呢。是不是按照窗口结束时间还有呢,原本的T对吧,它是不重新把它切分开了,按照原本的T再重新分组之后呢。把它聚合回来就可以了。把它做一个汇总,你看这就是一个思考写法。给大家做一个参考啊。
我来说两句