00:00
啊,接下来我们聊一聊一个比较重要的内容,数据倾斜。那数据倾斜首先第一步就是判断一下它有没有存在数据倾斜,对吧,那这个呢,咱们在讲反压的时候也看到了啊,那个任务还没停。这些都关了吧。嗯。我们当时啊,定位到法尼算子是这个map。诶,然后呢,点一下萨task的统计,大家可以看到这边是存在一个数据倾斜的。那这一块怎么办呢?这里咱们能看到对吧。这这是很明显的,这是最直观的啊,你也不其他方式也可以看到分析出来这个。是最快,那么。接下来咱们聊一聊。这个。数据倾斜的一个分类啊,那其实咱们应该分为这么几类来了解,第一类呢。
01:02
击败之后的聚合。啊,就纯流式的聚合啊。存在倾斜怎么办?还有击败之后的聚合,但是同时我用了窗口又怎么办?那还有一种呢?KBY之前发生数据倾斜,他们以K为基准啊。有没有开窗,哎,做了一个划分啊。所以咱们不能简单来讲说数据倾斜,我就是二次聚合就能搞定了,并不是这样,那首先咱们来看看第一个啊,T败之后的聚合操作存在数据倾斜,比如说咱们做一个word,靠,是不是要分组,然后再聚合呀,对吧,那这种是纯丢失的场景,那如果用二次聚合,我们看看会有什么问题。看一下,这是我单独画的一张图啊。是一个流逝的过程啊同学们,那你看现在比如说是有HELLO1这种数据,那第一个呢,就是一个单词,那么你看当第一条数据。
02:04
如果你直接统计有倾斜,那现在我们是不是考虑打散了?打散成多少几份,然后聚合完。再往下游发对吧?好,那你看,那如果是第一条数据,假设我被打散到了黑下划线一,咱们是不是拼接一个随机数的前缀或者后缀就O了对吧?那假设这条这第一条数据被分到了一这一组。下划线一,那它是不是要本地聚合得到是一个什么结果,是不是就是hello_一啊。对吧,逗号一统计结果为一,那这个结果如果是流逝的场景,那它是不是直接又往下游发了呀。对吧,那直接往下邮发,那你看看他是不是接收到一条数据叫heade,在第二次聚合,是不是将后随机那个前缀或者后缀给去掉。去掉之后,你看他得到结果还一啊,这个没毛病,好假设再继续发第二条数据。
03:07
第二条数据刚巧呢,他也来到了hello_一这一组,对吧,那这个时候本地聚合它会干什么?是不是hello_一逗号二啊。是不是这条结果?对吧,因为他本地是不是一加一。等于二,然后发送给下游,那么下游之后去掉这个后缀下划线一,那他要做什么事啊?一加二等于三。你看大家可以发现的吧,存在什么问题?结果错了吧,因为你是流逝的,流逝就是来一条处理一条,来一条处理一条,就从头到尾每一个算子都是流逝的,首先结果是错的,第二一个你的数据量有没有减少,你一开始是两条要处理,最终这里是不是还是两条处理啊,所以这种场景你是毫无意义的。
04:01
那为什么咱们的MR以前讲hi Mr或者是B能够用二次聚合的思想来搞定呢?因为他们都是什么P处理,他们是不是有P的概念,他们是一批数据聚合,聚合得到一个结果再给第二次去聚合,它不会像流逝一样来一条处理一条,输出一条,往下游输出一条,那这个是不合适。那么要解决这种场景怎么办?嗯。咱们可以干嘛呢?记得MR也好,是不是有个概念叫提前或者盒?Cabina。对吧,也就是说在每一个map任务里边,咱们先提前把相同key的先把它聚合起来,之后呢再来做分组。哎,没错,就是利用这种思想,那我们简单把它称为叫local k by的思想可以吧?啊,Local k by在KBY上游算子,也就KBY之前。
05:02
我们先对它的本地,在本地进行一个预聚合。那这样数据量是不是减少了,对吧,预聚合完之后再发往下游再去汇总啊,这样就挺好的,但是你想想咱们那这。本地预聚合,但咱们数据是流逝的,无界的,流逝的,来一条来一条,那你到底是聚合几条啊?是不是还有这么一个问题,所以呢,这就要求咱们干嘛。必须是多条数据或者一批数据才聚合,也就是说咱们是不是得做整批的操作啊。对吧,你原先这个需求不需没有展批,但你要现在要自己批。因为咱们单条数据你跟谁聚合,你拿一条处理一条,输出一条,你跟谁做聚合,对吧,这个就不现实。所以我们必然要设计一个展P,那展P很简单,咱们用自定义,自己通过代码就能够实现了啊,所以你也不要慌。
06:01
这个是咱们给大家介绍。那我们直接上代码来,呃,Ski demo1啊这个。是咱们前面的一个小案例啊。不是UV的,我改了一下啊,前面的都是数据的处理啊,转换的过滤这一块你就不用看了。啊,就这一块的逻辑。不用看了,那最终这边是转换成了mid,还有E这种二元组,那接下来是不是word count的这种场景呢?Mid就设备ID对吧。好。那这边也不去从什么的呢,那接下来呢,你看我做了什么。呃,这边我是一个代码来实现logo key by跟不logo key by的效果啊,所以我是做了一个分支判断啊,我通过传参来控制我走哪个逻辑,这个不影响啊,那我们看看一个。传统的。
07:03
你看我这里做了一个什么flat map,那在flat map我又会做什么呢?这边我会去展P,哎,咱们的展P跟本地聚合就是在这里实现的啊这边给。大家写一下注释啊。啊,实现logo的功能。就到这一块。那我们带大家看一下逻辑是怎么写的。那首先呢,我定义的,特别是一个构造器啊,传一个参数,你不是要展P吗?那展批就得有批次大小,你不可能无限期的涨下去。好了,那么接下来我会。为了容错对吧,比如说你自己展批定义一个变量可以涨,没毛病,但是挂了之后你攒的数据没了,那咋办对吧?所以要定义一没实现check。
08:07
然后呢,我就定一个状态,到时候呢,就是将批次数据啊,就将它存到状态里,对吧?啊做一个容错处理啊,这没啥,那接下来本地展P,我定义的是一个哈希map。然后把前面这个关了,一直吱吱响啊。哈希面板来存储咱们的数据。那接下来这个批次大小啊,另外一个就是当前已经攒到多少条数据了啊。这是当前已经多少条?就这么简单,几个变量,一个。纯数据,一个批次大小,一个当前大小,当前条数吧,你理解成条数,那接下来就是代码了,逻辑就是这么简单,就这么几行,你看。呃。这个buffer。
09:00
我是不是取出它的值啊。然后把它加一对吧,就记这边是计数嘛,将新来的数据把它,呃。加一,并且呢。把这个大小啊。给他拿到。哦,不是啊,这个是什么呢?这应该是这样啊,咱们先获取之前buff份里面存储的统计值对吧,我们是不是沃count,是不是来一条加一,来一条加一啊。那这个,呃。这应该是value.F0。这是不是mid,也就是说以mid当做key,然后统计值当做value,就存在本地的buffer里面,那我们先获取一下之前的,如果没有就以零表示,对吧,然后呢。
10:03
当前的这个key是不是再存进去,然后count加一啊。就统计嘛,这个是哪家啊在哪家,咱们的统计值就本地一聚合。E就是本地句子。相同key嘛,因为这里没有做KBY来的数据,哪个key的都有,所以咱们用哈希map来存储啊,Key呢,就是咱们未来要做KBY的key。好,那么再往下是不是要判断批次啊,那我这边用的是一个计数器,它我先调用了一个方法,这是自动会加一的啊,你看啊。这里会加一。当前来一条嘛,那我当前的就加1A之后跟咱们的批次大小做一个比较,如果大于等于批次大小,也就是说已经达到了咱们要攒的批次,我就什么呢?便利八分钟的数据发送到下游。
11:06
对吧。那么这边遍历的话会有问题吗?不会,咱们是哈奇map法啊,你想想我同一个key,比如说同一个MID123,我累加完put进去,是不是覆盖掉了,只存一条最新的结果,是不是一个key一个mid只只有一条结果,对吧?那咱们这里可能是有多个mid的统计。那我编辑出来,然后把它get出来用发送给下游,这里就是发送。那最后呢,你发送一批,那你对应的这个缓冲buffer也好,还有这个统计的这个值也好,要重置一下。这个逻辑很简单啊,逻辑很简单,一个是本地累加啊累加接下来判断批次,到条件呢,我就发,发完之后记得把这些本地本地缓存的一些东西把它清掉啊没了,逻辑就这么简单,核心逻辑就这样,那至于下面这个是什么呢?下面这些是我考虑到咱们容错性的要求,咱们checkpoint function的两个方法,把它重写了一下啊,首先对状态做一个快照。
12:12
那这个时候是不是要给状态赋值啊,对吧,然后赋值的话,我先是执行了一下清空对吧,防止一些呃错误,然后呢,我把这个buffer的buffer,把buffer里面的数据添加到这个状态里面去,那回头他做check po就可以把这些buffer的数据备份下来了。接下来是恢复的时,恢复状态的时候,也就是说啊,重启了从checkpoint恢复到本地啊,那怎么办呢?咱们是不是要将状态里的数据重新赋值给呃,Buff logo buffer跟当前大小啊,也就本地变量啊,从状态取值给本地变量,那你看我这边是初始化了这个buffer,然后呢,判断一下。再往下。遍历这个历史的状态,遍历一下,然后是不是取出来。
13:02
Get出来。这一步大家看我为什么加这一步呢?我是多考虑到了一个事情,你可以不关注啊,就是后面这两个方法,你可以不用,呃,不用关注,核心逻辑就在上面。这个容错的考虑是什么?为什么不是刚吗?为什么这里还要get一下,回头还要加回去?这是咱们考虑到调整并行度,咱们用的是什么?List state。那回头比如说我原来并行度是二,我调整成三了,它是不是会均分,把这些例子的状态的数据均分给三,均分成三个对吧,那这时候均分就可能什么。可能出现,虽然你这里。呃。是不是有多份呢?可能出现同一个mid有多条数据啊,正常来讲咱们是不是一个mid有一个KV啊,也就是说m mid123只有一个统计值,比如说是十。
14:08
只有一个吧,但是你重新分可能是原来属于三个不两个并行度的,举个例子啊,本来这里有一个mid为123,它的统计值是十,那么在另一个并行度上面,它也有统计的123,比如说统计的是九。那你重分的时候,是不是可能这两条数据到一起去了,那这个时候你是不是还得重新把它累加起来十加九啊,哎,就是做这个事儿。正常是刚初始化没错啊。那我先get一下。你不是在便利嘛,对吧,你可能有两条嘛,啊,所以要先get一下,如果你只有一条,也就没有咱们这种重复m mid的情况。啊,那就默认只给个零就行,所以呢。咱们除了取出状态的值之外,还要考虑到这一点啊,把它加回去那就行了呗,啊,你别看只有两行代码去考虑很多事儿啊。
15:03
那么。接下来呢,就是咱们恢复的时候,默认buffer数据已经达到了batch size。对吧,你看我这初始化计数器的时候就是按Bach size给他的,这样的话再下一条是不是就按零开始啊。呃,再往现有的这个批次直接怎么样,直接发。现有的批次先发出去嘛。这个设计那如果啊没存过,那我们就从零开始,你看代码量不多啊,但要考虑的事儿比较多啊。之前这里改了一下,咱们打个包。分别来给大家演示一下。打包完成我拷贝一下。好了,已经上来了,嗯,那我找找这个案例啊。这个是初始案例,这个是不使用的,后咱们看一下同一个案例啊。
16:08
做一个对比啊。这个是不使用logo就没处理未处理。咱们mid是存在倾斜的,来打开,打开之后同样的道理。而应该是在哪有啊。是不是key reduce?K之后,然后倾斜嘛,我这边的代码是按照mid做一个。嗯,我看一下。你看按照mid。作为一个分组。二元组我们说的是MID1嘛,这种F0就是mid。好,直接看这里就行了,你看这是不是倾斜呢。这就是咱们的reduce聚合啊。那在这呢,你能看到其他都几百K,这个都十几兆了。
17:02
这就是一个效果。倾斜的现象啊,这个咱们前面都看过了。呃,那接下来。咱们再重新提交一下。这个我改成醋。你看在架包后面是不是还可以指定一个main参数,对吧,这main方法里边传参嘛,这边是利用到了flink提供的那个parent to。这个parent你from as就是may方法的A,然后呢,你通过get的方法就可以直接提速了,还可以设定默认值,说白了就是提供了一个工具类,方便一点啊。那这边如果我把它制成醋,它就会走下面这里对吧,因为我这边是加了一个非啊这代码逻辑啊。那就走下面,下面呢就是走local k了。我提交。
18:11
好,现在有了,我们看一下是不是直接看reduce就行了,就看这个对吧,Reduce等它开始运行起来。好了,看一下这边的统计的数据量。这个应该不是啊。等一会儿。好了。你看现在是不是就比较均匀呢。再等一会儿。你看都是100多K吧,一百三一百四。对吧。这个就是咱们通过local key by本地展P本地展批,并且提前combine之后呢,再做key by的方式啊,也就是说其实是这个地方。
19:03
Map完成的这个事儿。再看一眼,你看很均匀吧。这就是咱们的实现,另外可以给大家提一嘴的是什么呢?大家可以看到咱们目前用data stream API来写,是不是要自己实现这个逻辑啊,要自己实现这个展P本地聚合,虽然逻辑不复杂啊,但可能你不想自己写啊,那么恭喜你啊。咱们flink circle。提供了相关的功能,也就是说如果你用的是flink circle啊,这个所谓的展P,它有一个功能叫mini batch。那本地聚合它提供了一个功能叫local global。这些功能只需要设定几个参数就OK了,那你就不用再自己去想,想方设法实现这个展P一下预具盒。
20:02
使用搜狗很方便,咱们在后面的章节第六章再去介绍,你看mini batch global。好,那关于这种K败后的聚合流式的处理怎么来解决呢?这个问题咱们就介绍到这里。
我来说两句