00:00
上节课这一部分我们已经实现基本功能了,呃,大家会发现我们这里边直接输出的呢,就是每一个窗口统计出来的PV个数,但是通过具体的分析,我们其实发现,呃,这里边如果要是说想要做一个并行测试的话,即使并行度调大,我这里边。直接也只有一个输出,就是一个分区的输出,对吧?啊,那关于这个原因,大家想到这并不是说因为我这里边这个并行度给的还不够,你你就算是再调大一点,给十给40给100都没用,对吧,因为我们知道后边。KBY的时候是基于哈西扣要做一个重分区,而我们这里边是不是所有的元素都卖不成了同一个K啊,那所以呢,当然你后边这个分组就是全分到同一个分区去嘛啊,所以之前我们不是说过尽量避免用window window or大家还记得吗?那你想当时说的,为什么尽量避免用in呢?就是因为它是直接把我们这个数据是直接就可以开窗去做统计了,但是它本质是不是相当于把所有数据分到同一个分区,就相当于没办法并行了呀,诶那家想你这个做法,这不是跟WINDOW2本质是一样的吗?
01:12
哎,所以这里面其实有这个问题存在,那怎么解决呢。简单想的话,那大家说是不是这里关键就在于这个K啊,这里对大家想你这个K如果要是直接用这个统一的PV的话,那当然最后导致的结果就是全部分到同一个分区,那如果说我们选择另外的一个K,是不是就可以避免这种情况了呢?哎,所以其实这个很简单啊,就是前面比方说我们还像一开始做的那个,呃,就是hot hot item一样,上来之后,比方说我们以这个item ID做一个分区,做一个分组,或者说以user ID做一个分区,做一个分组,那大家想是不是接下来它就会分到很多个不同的K里面啊,啊,然后就再把这个做一个整合就可以了,那有的同学可能还是会有一个会有这样一个考虑,就是说你要按照那个去分的话,首先有一个问题是,可能那个K太多了,对吧。
02:09
我们觉得就没必要,因为你对于我们现在这个需求而言,是不是其实就是一个分区,一个P,其实就就可以把这个搞定了呀,那就是直接我这里边直接做统计就完事了,对吧?然后另外还有一个问题在于我们这里面如果说某一个比方说user ID,或者说某一个这个item ID。它对应的数据就是特别大。那大家想这是不是就是相当于还是会有这样一个问题,在于大量的数据是不是就堆到了同一个分区去做处理啊?啊,这就是我们说在生产实际当中更容易遇到的所谓的数据倾斜的问题了。大家想,这如果要是大量的数据都堆积在同一个分区,那是不是就导致我这一个分区有可能数据都堆着啊,一直处理不完,但是别的分区有可能都闲着对不对。资源利用率就非常的低啊,所以这其实在大数据处理环境里边是经常出现的一个问题,怎么解决呢?
03:07
其实核心还是在于这里边我们K的设计上,如果大家想到我能把这个K就相当于设计成一个均匀分配,分配到每一个分区上这样一个效果的话,那最后是不是相当于各个分区的数量就差不多啊,最后我们的这个任务的呃,就是整个相当于也有一个负载均衡的呃状况出现了,对吧?呃,整个是一个平衡的状态。那具体来讲的话,怎么样分配key就能实现这样的功能呢?是不是随机啊,随机生成T就可以满足我们这个需求,对吧,所以这里边我们做一个。呃,并行,并行任务改进,整体思路也非常简单啊,我们是设设计随机key,然后解决,其实也是在解决这个数据倾斜的问题。
04:05
这里我把前面的这一部分呢,就给大家叫做这个,这个叫PV result stream0了对吧,然后接下来我们基于前面这个还是重新写一下啊data stream接下来是要做首先做一个filter,这个还是一样的啊啊,那前面这个我就不详细写了,还是。相同的P,呃,如果如果是PV行为的话,我把它筛选出来,接下来继续做进一步的操作啊,呃,然后接下来这一步,前面我们还是做了一个这个map乘二元组,那其实我们现在能想到还是一个map,只不过当前生成的这个二元组就可以有一个考量了,对吧,你不要直接所有的来了之后都是一个PV,相同的K放在这我这里边。生成的这个数据啊,二元组里边,我要求首先前面那个K的那个字段呢,大家想这个也没必要非得是字符串对吧?啊,我其实是不是直接随机生成一个,比方说随机生成一个一到十的之间这这样一个整数就可以啊,然后接下来按照哈西扣的直接重分区,就可以分到不同的对应的这个呃,不同的分区里面去,对吧?啊,所以这个其实是比较简单的,我们直接给一个整数作为当前的K,然后当前的值呢,哎,当然就还是啊,给一个一直接放在这就可以了,那下边具体实现的过程。
05:30
也非常简单,呃,这里为了我们方便生成随机数,可以先创建new一个这个random这样一个。随机数生成器对吧?把这个random设置定义出来,接下来return的时候就要new一个二元组里边,当前前面的这个整数K随机生成random.next int,比方说我就是在十之内对吧?直接给一个十以内的整数,那这个的话就主要是看大家怎么样去考量了,因为如果说你给的这个数特别少的话,那相当于我们的K就很少。
06:09
数就不太分散对吧,但是不太分散有可能带来另外一个问题,就是我们取了哈西扣之后,是不是有可能就就相当于我们最后是依照他的那个哈西扣重分区的嘛,那你假如说我就分四个分区,1234结果。比方说啊,一和一和三,然后求了哈西扣之后那个,我因为我们不知道它底层的那个算法是什么样的吧,呃,正常来讲肯定不会对吧,肯定不会说再分到同一个分区里面去,就求了哈希值之后再去做一个曲模运算,是不是有可能就分到了还是同一个分区啊,啊就是所以在这种情况下,避免这种情况,我们一般是会给的比我们的分区数稍微大一点,对吧?啊分的K稍微多一点,然后再重新分配的话,随机分配就会啊,比较整齐啊,就大部分都会平均分开啊,所以这里边比方说我随便给一个这个十在十以内的一个数,另外后边当前是不是就直接给一个EL就可以了,对吧,直接把它map成一个二元组,后边接下来是不是就一样了啊,接下来我就直接。
07:14
KBY啊,然后接下来我直接用这个,呃,我们前面直接KBY0啊,但这里边如果说呃,你不想让他得到那个结果还是一个,大家知道这个得到那个K的类型是不是还是一个temp类型啊,如果你不想让它还是temp类型的话,我也可以写成一个拉姆表达式,是不是可以用贝塔的F0去做一个指定当前的K,那大家知道现在我的K是什么类型?是不是就是int类型啊啊,这个就没毛病了啊啊,那后边我们就还是开窗,当前我们是一小时的时间窗,滚动时间窗口后边做聚合,我们还想到想加上那个window的信息,因为现在并行嘛,你如果要是统一输出的话,有可能这就乱了,对吧?啊,所以我加上那个window的信息。
08:03
做一个aggregate,这就跟我们之前讲的那个一样了,是不是先去来一个增量聚合函数aggregate function对吧,这我这我把这个叫做。PV PV count a,然后后面这个我叫做用一个PV count result。好,我把这个定义出来啊,这个我可以先叫做PVSTEM,其实大家会想到这个还没完,为什么这个还没完啊。你这。因为这接下来我们是不是还是按照每一个K去统计出来的窗口里边的那个值啊,那所以我们要的那个窗口里边的值,是不是应该把每个K对应的那个再合起来啊,所以后面还有一个汇总的过程,那大家想一下这个汇总怎么样去汇总呢?
09:00
呃,其实是差不多的对吧,我我我们后面啊,然后将。各分区。数据汇总起来。大家想到接下来的话,是不是相当于跟之前那个类似,我们之前是把同一个窗口里边的数据收集起来,要做排序,那大想现在我是不是把同一个窗口里边的数据。收集起来那个countt求个和呀,哎,所以这里面其实跟前面是非常类似的一个一个做法啊,当然就是有同学可能想到了,就是我这里边是不是直接PV stream,直接点KBY,当前的这个window and,对吧,我们用这个page view count里边的window end,然后是不是直接接下就是我这里边key了之后啊,直接做一个sum是不是就完事了呢?哎,当然这样也可以,但是大家想这样萨的话,我们这里边是不是来一个数就萨一次啊。
10:02
最终相当于这个一同一个window。大家知道同一个window结束的时候,尽管应该每个K对应的那个count值是同时输出,但是我们在流里边也是有先后的,对吧,那最后你如果直接sum的话,就会导致他是不是来一个就萨输出一次,然后再来一个是在之前基础上再叠加一次啊。大家想想是不是这样啊,我这里边可以先直接给大家把这个直接sum起来啊,比方说我现在要sum的是不是当前的那个count。对吧,我直接做一个这样的定义,然后我们把这个定义出来,当前啊,上面这个还是object啊,所以这边有问题,我们想要的是那个。大家还记得我们想要的其实就是包装好的那个P,呃,Page view count这样的一个,这样的一个,呃,对我诶我们当时是是叫这个pay对吧。这就是我们包装好的那个,呃,对,对应的那个po类型嘛,所以这里边我们把它改成page view count这样的话类型就对了,对吧,这个最简单的想法,直接这么写肯定是没问题的,我可以把这个改成一个this stream啊。
11:15
这个就叫做PV stream。好,这就是我们整体的一个处理流程,我先这么看一下啊,然后这里边大家看,接下来就是实现这两个自定义的。一个aggregate function,另外一个window function,过程还是几乎是完全一样的,实现自定义。域聚合函数,或者叫增量聚合函数public static class,呃,这里边我们是要implement一个aggregate function里边的类型还记得吧,In ACC out,当前的输入是。当前的输入应该还是,诶诶大大家注意有些同学可能说,诶输入是那个u behavior嘛,原始的那个数据嘛,注意我们现在开窗统计的是不是已经map成二元组了呀,诶所以这里边我需要把这个二元组类型当做当前的输入类型。
12:17
然后中间的聚合状态和输出,是不是我直接用那个长整形就可以了,就一个数嘛,好把这个定义在这。呃,我们这里是看一下这里边哪里有问题啊哦,这里边这个。哦,这里边这个。应该是这样对吧,这样就对了啊,少多了一个这个间括号,所以把这个写出来的话,上面这里边第一个就不报错了,对吧?啊,这就没问题了,接下来里边当然就是实现这样四个方法了。Create,这是0L,然后这里边是accumulator加一下边是返回一个accumulator。
13:03
这里面墨的话,A加B跟之前完全一样。好,然后接下来我们是实现自定义的。窗口函数就是全窗口函数啊,Window function public static class啊,我们当前这个是叫PV result result对吧。好,那么我们接下来implement一个window function。里边四个类型大家还记得in outt key window对吧?哎,那么in当前应该是前面预聚合的那个输出是个长整型,那out的话是我们包装好的想要的那个port类型,对吧?配置view count,呃,然后接下来当前的key的类型呢?因为我们当前是不是已经直接写按照这个拉姆达表达式直接定了,那当前是不是key是int类型,这里就是int对吧?这个大家要小心一点啊,这里边的类型一定要注意inte。
14:05
然后接下来还有一个window类型time window,把这个写完之后再确认一下上边没有任何问题,类型是匹配的,那我们就可以把这个对应里边的apply方法做一个实现了。嗯,我们现在想要的那个apply是不是就就是里边我们想要拿到的东西就是。对应的其实就是一个一个,呃,就是return,一个不是return,是要用out.collect对吧,里边要去new一个page view count里边的字段,首先我们要的是当前的那个。呃,当前的那个URL,现在我们没有URL对吧?哎,那干脆直接把这个integer直接to string放在这,就当URL放在这儿得了对吧?分区号啊,相当于那个key编号,然后接下来window.get and,这是window and,另外还有一个count值,那是不是直接拿input里边就一个literature.next拿出来就完事了,对吧?哎,这就是我们前面要做的这些操作。
15:13
那已经做了这样的一个改进之后,我们接下来运行一下,大家看一下效果怎么样。好,那接下来大家看一下我们这里边运行的结果,诶大家看现在这个结果是不是就相当于分了不同的分区都有输出了呀,诶这里边现在确实就是一个并行执行的这样一个状态了啊,然后但是这里面大家会发现这里面输出的好像并不止,呃,就是像我们之前的那个,就是只有十个窗口的输出结果,这个原因是为什么呢?为什么这里边连续,哎,你看这个我们这个URL啊。呃,等等于三的,然后,然后他这个不停的在输出,不停的在输出,对吧,等于二的不停的在输出,哎,这个是为什么呢。这其实就是因为我们现在做py sum的时候,这是不是来一个数据要处理一次啊,所以之前我们按照不同的分区已经把它这个做了计算之后得到的那个URL,其实应该就是当前分区里边,那可能是就是一个URL对应有一个count值对吧?呃,对应的有有这样的一个数出现的,你看这里面比方说5200这样的一个。
16:26
呃,一个window and啊,我们找另外的这里是不是也有5200啊,大家看前面这一个是4000多个数,这里边有9000多个数对吧?呃,那大家自然就想到了,我当前其实是分到不同的分区去对它做了一个,呃,之前我可能有不同的K,然后呢,分到不同的分区去做了一个统计,接下来我是不是再把它K外之后分到同一个分区里面要做一个叠加呀,所以你看到5200这一个对应窗口里面的值是不是不停在上涨。大家看一开始4000多,然后9000多,14000,然后一万九对吧,5200后面呃,24000多,28000多,如果大家仔细看的话,你会发现就是最终每一个窗口大家看5200最多应该就到这儿了,对吧。
17:15
48000多对不对,最终最大的那个数,基本上跟我们之前是基本上是匹配在一起的啊,我们看一下后边还有这个比方说呃,这里边这个668800这样的一个一个数啊,下面还有没有。大家看这个最最大是不是到47000多啊,我们记得之前最大的应该有五万五万两千多,对吧,大家看这个683200是不是最大有52200多啊。哦,还有一个应该比这个还大啊,这个686800,这个是52500多,跟之前的那个数基本上是一致的是吧?哦,但是就是说我们看的话,看的会非常的混乱,因为你针对每一个窗口,它会。
18:01
连续不断的输出,只要有数据来了之后,它就更新,哎,所以我们这里边可以改进一下,怎么样让这个窗口只有一次输出呢?哎,大家就会想到,那是不是还是像之前那个做法一样,之前我是把说所有数据都收集齐了,最后再排个序,现在我是不是可以把所有数据都收集齐了,再求个和呀,诶所以这里边啊,改进的话也非常简单,直接在这儿是不是,那这个等到所有数据收集企业,那是不是我又要注册那个定时器了啊,所以又是process function啊,这里边你又一个定义一个total PV count这样的一个process function,后边的话我们要做一个自定义的实现,实现自定义处理函数,其实关键要做的事情,其实就是把。相同呃,窗口统计呃,就是相同窗口分组统计的count值叠加,这就是我们想要做的事情啊。所以public static class定义一个PV total PV count。
19:28
Total p count,然后接下来我们要extend啊,继承一个对应的那个抽象类,就是K的process function里边的类型是KIO,哎,那当前的K我们分组其实是按照那个window end去做分组的,对吧?大家看前面我这里面是不是还是要按照我当前的那个每个窗口去做分组啊,所以接下来。接下来是不是这个直接就是长整型的T啊,对吧,我们用的那个方法引用吧,不再是ta类型了啊,然后接下来它的输入输出呢,输入是page view count这样包装好的破类型输出,输出是不是我们类型还是要page view count呀。
20:14
对吧,只是做个求和嘛,这个不需要改变,所以还是page will count放在这就可以了啊,所以这个一写的话,上面的类型就不再报错了啊,首先确定我们这里写的是对的,然后接下来那就是每一步具体的操作跟之前非常类似,是不是首先应该定义一个状态啊,对吧?呃,这主要就是要保存。当前的。总count值,大家想,现在我就没必要像之前那样拿一个例子或者拿一个map把所有的数都存下来了,我是不是只要来了之后叠加一个count就够了?然后等到所有数据到齐的时候,我是不是直接输出那个count值就完事了,哎,这就是我们标准的这个流失处理的方式嘛,呃,这里面我只是保存一个,是不是只存一个value state就够了,Value state。
21:13
那既然是长整型的那个抗值嘛,Value state law,这个我就叫做total count state,那对于这样的一个当,呃,我我们对于。我们现在如果要是接下来要去做一个实现的话啊,那就是应该要有一个。要要有对应的这个open生命周期里边要去获取当前的状态句柄,然后去做一个定义声明,对吧?呃,Total count set,那就是get runtime contacts,然后get里边要你一个是不是value state script啊,当前我把这个叫做total count。然后当前就是长整型点class定义出来对吧?啊,当然这里边就是你也可以去定义一个初始值,大家想我这里面初始值应该给什么。
22:11
因为我是count嘛,所以初始值是不是给一个零啊,这方便后边就就省得再去考虑他一开始还是none的那种情况了啊呃,这个偷个偷个懒,然后接下来里边还需要必须实现一个process element方法,对吧?那么我们现在的做法其实怎么做呢?是不是每来一个数据之后,现在不是不是加一了,或也不是说那个直接把它丢到那个list state里了,而是要取它里边的count数量,是不是要叠加到当前的total count state里啊,所以我当前应该是total state要做一个。Update对吧,Update update成,诶大家想是不是我要是在之前的那个基础上是不是先做一个提取啊,对吧?点value先拿出来,然后再加上value.get count,这样做一个叠加,再去做更新对不对啊,这就是我现在更新当前这个总共count值做的操作,那另外就是注册一个定时器timer service timer,对吧,同样还是用当前的window and加上一毫秒,只要触发就可以了。
23:22
那最后是不是还必须得有一个on timer的实现啊,啊,那其实我们知道定时器触发的时候,是不是相当于所有分组count值是不是都到齐了,都到齐那么是不是直接输出当前的总count数量啊,那所以这里边其实就是呃,我先定义一个这个total count吧,当前的total count,那是不是就应该从状态里边拿出来就完事了,点value拿出来对吧?然后接下来out点是不是做一个输出啊,你一个page view count,我们当前啊,比方说我这个不要用当前之前那个分区数量了,我直接管它叫这个PV或者叫total就可以了,对吧,URL给一个名称啊,然后接下来当前的窗口的那个window end是不是直接get cover key啊。
24:19
我现在分组就是按窗口分的嘛,然后最后再来一个当前的total count。这就做完了对吧?啊,当然为了清空当前的状态,是不是每一次触发了之后,我就把当前这个total count state直接清空啊,清空状态total count state clear,这就是一个完整的流程,对吧?好,那我们来测试一下这个代码,看一下现在的运行情况怎么样。啊,我们把它跑起来,大家看一下预期的话,应该还是只有十个窗口的输出,对吧。
25:04
我们看一下当前结果已经输出了,确实是啊,几个不同的分区都有,都有这个输出对吧?当然可能不是特别均匀,因为我们只有四个分区,结果我们是十个K对吧?啊,它这个就有可能是不均匀的,大家可以按照这个在设计的给它均匀一些啊,我们看到这里边啊,这个其实不一样,最后的结果跟我们前面的那个就是K的随机分配,分配到十个上,其实没关系,大家知道这是为什么吗?我们最后的这个输出,其实是按照什么的输出啊。是不是按照window end,然后做了分区之后的那个输出啊,对吧,后面又做了K了,按照window end,呃,这个分区输出了,所以这个其实是没关系的,对吧,后边你那个当前的这个窗口,最后在哪个分区上去做呃,聚合的统计,这个是无所谓的啊,我们整整个来看的话,哎,你看现在这个顺序就乱了,对吧,这就没准了啊呃,但是大家可以看得到,基本上还是四万多五万多这样的效果,然后我们看到在这个呃,686800的这个窗口啊,结束的这个窗口,它的最大值52500多对吧,52552。
26:13
这就是我们得到的这个效果啊,当然最后一个窗口,69400的这个窗口,它只有13个数,所以说它的数据比较少,别的基本上都是四五万的一个状态。这就是对于PV值进行统计,然后我们考虑到数据倾斜,考虑到并行的一个优化。
我来说两句