00:00
现在已经把这个PV的需求做了一个简单实现,那有些同学可能会对这个有所疑惑,就是这里边我们做了一个操作,就是干什么呢?把所有数据都分到了同一个组对吧?哎,大家看到这一句注释的时候,肯定心里边会有一点疑惑,就是诶,如果说你这里边把这个数据都分到同一个组的话,假装看起来好像是KY之后做了一个time window斗啊,那你这都全分到同一个组,这不就跟那个time all是一样的吗?之前我们说不推荐大家直接基于当前的这个data stream去做time window2window all操作啊,我们当时说做了window all的话,就相当于会把所有数据分发到同一个组里边去,那整个程序的并行就就没有意义了,那如果说现在我们还是这个做了一个KBY,只不过这个KBY呢,呃,是同样的一个K啊,那这这里边这不就相当于还是同一个组没有区分开吗?如果要是做并行的话,那是不是就相。
01:00
当于并没有利用起我们当前的所有的资源呢,哎,这我们可以做一下测试啊,当前我的这个并行度设了一啊,那大家知道我如果要是直接把这个注掉的话,默认应该是按照CPU的核心数量,对吧,在当前的这个开发环境里边,那我当前应该并行度是四啊,按照我的这个核心数量,我们来运行一下,看看当前的这个代码有没有什么其他的状况出现。好,现在已经代码提起来,我们看看这个并行度调成四之后,相当于我们重新设置了一下四啊,看看最后的结果怎么样,好,大家看到现在已经全部输出了,大家发现这结果是什么呢?尽管我们有四个并行度对吧?啊,大家之前也都看到看到过,我这边电脑本身是四核,那么默认开发环境里面不做并行度配置的话,应该并行度是四,但是现在大家看到全部都是这个一号分区对吧?或者我们认为就是一号slot上输出了所有的结果,这就是因为我们前面做KBY之后,就是一开始我们输入的时候,这里边做map操作的时候,并行度应该还是四,对吧,这里边我们是并行的,但是呢,到后边filter这里还是四,这里map还是四,但是经过KPI之后,全部发向了同一个分区。
02:28
啊啊,所以后面我们做的这个所有的操作,其实就就都已经是啊,就是所有所有数据都不会再做这个并行处理了,都是同一个分区里边处理的结果,那我们认为现在的并行度是四,其实每个lo里边还是有这个任务的,对吧,只不过没有数据来,这个任务就白费了,一直在那里空转啊,所以这里边我们其实是要对这种情况做一些改进和调整的,这其实在实际的生产环境当中,这种行为其实非常的典型,就可能不是说我们把所有的数据都分到了同一个分区,但是也有可能出现什么呢?你做KPI的时候大家会想到啊,比方说前面我们做这个。
03:10
就是这里边hot page,我们这个k buy URL,或者是前面我们hot it,这里边我们k buy item ID,那大家会想到假如说某一个商品或者某一个URL出现的次数特别特别多的话,是不是就会出现我们大量的数据都会被分发到同一个分区里面去啊,经过KPI之后得到的是同一个结果,对吧?那所以在这种场景下,这就是我们说大数据处理里边的经典的数据倾斜的问题,那对于这样的一个数据倾斜的问题,应该怎么样去考虑,怎么样去优化呢?那其实思考也非常的简单,就是在这里你不要非常简单的直接给一个所有数都一样的K,对吧?或者说如果说之前我们做K外的时候,这里的这个K你会觉得它有一点倾斜,有一点问题的话,那这里我们就不要直接以这个作为K,而是怎么样呢,自己再重新做一个设计,比方说在这里边,呃,我们这个比较简单啊,这个场景就是所有。
04:10
这数据都要放在一起去做统计,哎,那我怎么办呢?就前边这里啊,我就干脆直接不做这个map,成这个二元组这个类型,然后K下限一了,我直接对它做一个啊,就是我们这里边可以直接对它做一个map map成一个,就是自己想要的一个K和一个一的这种情况,对吧?哎,我可以直接做这样的一个操作,哎,那比方说这里边我们自己定义一个my map my map生成一个,大家想怎么样的这个key就会最为均匀呢?是不是随机生成的K就最均匀啊啊所以这里其实你直接随机生成K啊,比我们直接这里做这样的一个配置,其实效果会好很多啊,那比方说这里边我们做这个map里边自定义啊,自定义自定义map。
05:10
随机生成分组的K啊,那接下来在这个map里边,我们其实所做的这个操作啊,这plus my map extend,哎,当前这应该是一个map方式对吧?我们现在没有用到其他的一些,呃,像状态编程啊,或者说生命周期,那直接就用简单的最基本的这个map方式就可以了,不用reach版本,那这里要注意它的输入输出是什么类型呢?输入啊,样例类类型对吧,一开始还是样例类类型输出,那我还是把它迈成一个二元组好了,只不过当前的这个K我要稍作调整,就不要直接给定了啊,这里边随机生成对吧?随机生成一个k user behavior,然后输出的是一个string long,长整型的一个二元组,然后必须要实现的一个方法当然就是map了,重呃。
06:10
我们去重写这个方法对吧,最终我们要生成一个二元组,那这个二元组呢,前面这个字段我直接随机生成,直接用啊scla的一个random object对吧,直接去随机随机生成一个,既然它是string类型嘛,我直接next string,然后呢,定义一下它的长度,这个随便给啊,我给一个十,然后后边再来一个1L对吧?诶直接在这里边定义出来就完事了,然后后边我们再去做,根据当前的这个下划线一啊第一个位置这个K再去做,呃,重新分配这个分组的时候,那大家有想到了随机生成的这样的一个string,十个字节长度啊,这样的一个string,那肯定经过哈希之后,那我们可以有理由相信啊,基本上是会平均分配到不同的这个,呃,这个当前的这个slo上的对吧,然后后面呢,后面我们这个不同的lo上得到的那个。
07:10
果后续又会就是到到后边我们做这个处理的时候,后边又会给到,呃,就是相当于我们还得把它再聚合起来,对吧?因为你现在比方说我们我们并行64,那你应该是分成了四组,分别统一统计当前分组里边所有这个PV值,所有的这个count数量,那最后是不是还得做一个萨马啊,所以这里边我们可以后边啊后续直接定义就是接下来在后面应该要去进一步的操作,哎,那后面做操作大家会想到根据什么去分组,然后再做聚合呢?那不是还是我们之前说的做那个window,根据window and表示当前同一个窗口,然后把四个分区的数据叠加起来就完事了,对吧?啊,所以这里边我们单独还是重新单独定义一下吧,哎,这里边比方说我定义一个啊,上面那个叫PV stream,我这里边叫total PV stream。
08:11
最后求和,那就是基于前面的PV stream,然后做一个KBY,当前要以这个window and作为一个分组标准啊,那后边呢,呃,大大家也可以直接去sum对吧?呃,Sum的话,那你就是指定当前是以哪个字段作为一个sum,比方说我们当前的这个字段,那就是我们不是后面这个就叫count吗?我就以这个字段作为这个sum的标准对吧?来做一个sum,得到的结果那就应该还是当前的这个,就是按照我们定义好的样例类啊,PV count view这样的一个PV count这样的一个样例类,最后输出一个window and对应的一个一个结果,对吧?所以最后我们输出这里的给一个这个total PV stream,把它做一个打印输出,然后现在再看一下它的运行结果会是一个什么样的效果。
09:06
大家看一下这里边我们输出的结果,这里面输出的结果是什么呢?哎,就是我们大家看到啊每一个分区,哎这里边一开始这个是这个数据很多啊,一开始这个是有第二个分区对吧?大家看到哎这好像我们最后输出的都是第二个分区,一开始我们在运行的过程当中,其实看到还是有其他分区的,对吧?啊有三有四有一,其实每个分区都有,但这里面最后我们看到的好像就都是第二个分区了,这个这是为什么呢?而且我们看到这个输出的数据量特别特别大,对吧?而且输出的都是同一个window,我们看到这都是同一个window它的这个值,而且你看到这个值是什么,就是一个一个在叠加,对吧,二七二八二九三十,就就一个一个在在做这样的一个叠加,这个原因是什么呢?啊,这个其实原因也非常简单,那就是因为当前我们做这个sum的时候,大家想一下啊,当前做这个sum的时候,那应该其实就是之前有一个数据来了之候,我就要叠加。
10:06
开一次输出一个结果,之前有一个数来了,我就叠加一次,对吧?啊就是这样的一个处理流程,那所以对于我们这样的一个结果而言,我其实想要的是什么呢?想要的是把当前所有的数据都收集齐了,然后啊,就像我们说的啊,都到齐了之后,我统一对当前的这个window输出一个结果就够了,那要这么看的话,这样的一个结果,尽管最后看起来这个统计出来的结果,大家看这个,呃,48292对吧?啊,48000多好像也差不多,因为这个这个窗口我们没准嘛,呃,大家知道你现在既然并行了,我们现在直接从文件里面读取所有的数据的话,不一定是按照时间顺序输出了,你看那个最后一个不是十,只有13个数吗?那他统计的肯定很快,对不对啊,所以那个肯定在早些时候就输出了啊,而我们这里边没有没有看到,因为这个数据量不够大,所以它这个没有全部能看到那个最初的输出的那个效果,我们也没有看到,就是前面并行的啊,不同的这个分区输出的那个效。
11:06
但是中间跳动的过程当中,我们还是看到不同的数字了啊,那现在我们想解决一下这个问题,就是最后不要你这个不停的叠加,我们就最后就是输出一个结果,就有点类似于我们窗口的那个效果一样,对吧?啊就是之前来了之后,你可以增量聚合,不停的在后面去去加,但是我最后呢,只想要一个就是最最终的结果拿到出来就OK了啊那这个事情怎么做呢?呃,那其实这个整体来讲也非常的简单,我是不是可以类似于我们之前,诶就是做那个排序的那个操作,对吧?我把所有数据都收集齐了之后再去排序,那我现在怎么样呢?我也可以把所有数据收集齐了之后再再去叠加呀,啊这个也是可以的,对吧?当然你也可以定义就是增量增增增量的那个叠加啊,我们这里边就稍微的简单一点,定义一个key process function,这里边比方说我定义这个叫做total PV count result,对吧。把这个自。
12:06
己定义一个k process方式,然后我们在下面做一个具体的实现,Case total PV count result,当前是一个key的process function,哎,为什么要用这个key的process function呢?那其实我们想到这就涉及到一个你怎么样去判断当前数据都到齐了,对吧?当前window的数据都到齐了,怎么判断呢?之前我们不是就是定义了一个注册了一个定时器,一毫秒之后触发吗?Window and1毫秒之后触发吗?现在也一样啊,对吧?你只要是过了当前的那个window and的时间,那其实所有的数据肯定就都到齐了,这个时候我触发一个定时定时器啊,然后把之前所有的数据做一个做一个处理,最后那个和输出不就完事完事了吗?啊,所以这里边我们K的process function式里边传入的,大家还记得那个类型吗?KIO对吧?哎,当前K的类型是什么呢?那你看这个KBY的时候下划线,诶当前这个window and是长整型,那我这个就不是。
13:06
两组类型直接就是长整形对吧,直接把这个长长整形做一个输入。定义好这个K的类型,然后输入的数据类型,当前我们输入的数据类型是什么?这不就是之前已经聚合好的那个PV count样例类吗?哎,所以我得到的输入是PV count,得到的输出也是PV count,对吧?形式都一模一样,直接把它定义在这儿,我们养成好习惯,每次都确认一下,诶,上面没问题对吧?类型是正确的。所以接下来我们主要要实现的就是process element这个方法了,当然在这个过程当中,大家会想到我如果这里边这个process element是每来一个元素都会掉一次,对吧?如果我们想实现增量聚合的话,你你如果只是想实现这个,呃,就是数据都攒起来,然后最后再做计算的话,那你就来一个,还是用一个list state来一个数保存进去,对吧?到最后把数都拿出来叠加完事了,那这里边如果要做增量聚合的话,也非常容易,是不是我只要定义一个状态,定义一个状态。
14:14
保存当前,哎,所有count总和是不是就够了,哎,所以这其实更简单,就是一个value state嘛,Lay定义一下啊,定义一个value,哎,那这里边我们定义一个total PV count result state对吧,这是一个value state,它里边的类型那就应该是一个,既然是求和嘛,当然是一个长整形了,里边我们定义的时候get runtimes,直接get state里边去new一个value state script里边给一个名字,当前是total total,我就直接叫total PV好了啊,然后plus长整形老这样先把它定义好,然后接下来呢,就是每来一个元素,是不是就是把它里边的那个PV count里边的那个count值叠加在这个状态里边就完事了啊,大家大家能想到。
15:14
到这个这个过程对吧?哎,所以我们接下来要做的事情,其实就是每来一个数据,将count值叠加在当前的状态上,好,所以接下来我们这个我先把这个当前的状态拿出来吧,对吧,这个叫做current total count,我们直接从这个total count PV PV count,诶这里面大写了啊,看着不舒服。把这个从里边直接用,诶用一个这个get方法调调它的这个,呃,当当前的这个value方法对吧,不是get方法啊,Value state直接就是点value获取到当前的值,把它拿出来放在变量里面,然后我们是不是就直接叠加,诶那就是这个这个状态直接更新就完了,对吧?哎,我们更新的时候呢,就用当前current to count再加上value.count就可以了啊,但是大家如果要是觉得这个写的有点太麻烦,你直接用这个点value再加上这个对吧,直接把这个传到里边也是一样的啊啊啊,但是就是我们这么先拿出来之后看的会更明显一点,然后接下来记得注册定时器,注册一个呃,Window and加一毫秒后触发的定时器ctx timer service,注册一个当前还是事件时间用value.window and。
16:50
再加上一,哎,这样就搞定了,那后边呢,定时器触发的时候到底要做什么操作,On timer里边定义我们要做的非常简单,是不是就把那个total偷count拿出来输出就完事了呀,对吧?哎,这里面不需要排序,只要拿出状态输出就完事了,所以我们定义一个total PV count,就是从当前的这个total PV count result state里边点value拿出来,然后接下来呢,包装成一个还是一个PV count样例类输出,哎,当前我们要的呃,这个窗口应该是哪个窗口呢?诶,这个大家知道window and其实就是在这个他S减一就是对吧?或者我们还可以怎么样用这个ctx,大家知道这个ctx里边不是有current k吗?因为我们当前就是按照window and做的分组嘛,所以你get current key直接拿出来就是,另外还有这个total PV count放在这就完事了啊,最后输出之后不要忘记。
17:50
我们再做一个清除的操作,清空当前的状态,可尔对吧?哎,这样的话整个处理过程就已经完成了,哎,接下来我们来测试一下当前的并行中,应该还是四对吧,运行一下看看这个效果怎么样。
18:10
大家看到现在已经诶统计输出的结果,这里边确实就是12344个slot都用上了,对吧?哎,真的是在并行执行了,而且啊,当然这这个顺序是没准的啊,不一定是按照我们那个窗口结束的时间,因为当前是读取文件数据嘛,处理很快,有可能本来应该是后面的数据,但是这个窗口的统计结果提前就输出了,你像呃,这里边我们大概的看一看啊,基本上都是四万多五万多对吧,最大的一个,哎,这个呃,686800这个时间点的这个窗口啊,这一个小时的数据是52500多啊,那最后一个窗口呢,13个数据,这跟我们之前统计的数据是一模一样的,但是就实现了并行输出,如果要是说我们这是一个真实的流式处理的系统的话,那大家会想到这就不可能说是乱序了,对吧,因为它是我们定义的,是有时间限制的嘛,一个小时再输出一个一个窗口结果那肯定就是按照这个时间,但是呢,处理的过程当中有可能。
19:10
都是不同的这个呃,线程不同的lo在输出对吧?啊就是这就充分利用了我们系统的并行能力,对于数据倾斜的一个处理就是这样的。
我来说两句