00:00
但已经得到了窗口聚合结果,那其实最后就剩一步,那这一步就是是不是就要收集同意窗口的所有商品count数据,然后排序排序啊,输出top n对吧?哎,所以我们接下来其实就是做的这件事情啊,那按照我们之前的分析,现在其实是。首先是不是要按照window and做一个分组啊,这样的话就是每一个窗口一个组对吧,然后就各排各的,所以最终我们得到的一个data stream,那大家可能会想到我最后要转换成一个什么数据输出呢?当然你也可以就是把我们最后得到的这个一堆count的那个数据保存成一个list,对吧,变成一个列表,直接输出也是可以的,但是呢,呃,我们现在是想给大家做一个直观的在这个控制台做一个直观的打印输出显示,所以我还要把它做一个格式化,对吧,做一个这个写上字符串啊,直接让大家看得更清楚一点,所以直接得到的就是一个this stream stream啊,那最后这就是一个。
01:14
这就是一个result stream了,基于前面的window a stream先首先要做一个k by window and对吧?啊,就先把这个window and先定义出来。我们是按照。窗口分组,现在的window and就代表这个窗口嘛,然后后边因为我们用到了状态编程,还用到了定时器,哎,那是不是这里边就直接是process对吧?啊,直接就是放大招的这个过程,所以里边要去new一个自定义的,现在是一个key的process方式啊,我这里边给一个名字啊,就叫做top n对吧,Hot items,这里边还可以传参,因为我并不知道这个hot n top nn到底是多少对吧?我可以把这个N作为一个参数传进去,比方说我取前五,哎,这就是用自定义处理函数。
02:17
排序取前五,这就是我们想要实现的这个功能啊。好,把这个对齐,呃,那当然这个做完了之后,大家下面可以直接把这个结果做一个打印对吧?Result直接做一个print输出啊,呃,当然你想看到这个结果的话,也可以有一个对应的那个前面有一个提示啊,当前到底是什么东西,我们这里面直接输出就可以了,所以接下来关键的就在于实现自定义的这个key process方式了,实现自定义key的process方式。Public public。
03:02
Static class哎,我们实现这样一个一个类啊,那么它需要去注意现在不是implementment,因为对process function都是负函数,对吧,至少它都是一个负函数,所以extend一个K的process function这里面有泛型,他的泛型大家还记得吗?KIO对吧?三个类型当前首先给的是当前建的类型,现在的键是什么呢?哎,大家注意window and,但是呢,我们现在用的还是这个字段名称的写法,是不是还是ta啊,哎,所以还是元组类型啊,直接写在这儿元组类型,然后后边还有这个IO当前的输入是什么呢?输入是前边我们得到的那个聚合结果对不对,大家看这就是流式处理嘛,一步一步来的,对吧,上面的输出就是现在的输入啊,Item will,错了,Item will count。
04:00
然后输出我们定义好了,是要格式化成字符串对吧,是一个缀,写完这个之后还是校验一下上面啊,这里面有一个错,那是因为有一个私有属性,我们这个还没搞定对吧?啊,这个调它构造方法的时候要传这个参数的啊,所以我们还是定义一个定义属性。定义属性,呃,就是top n的大小对吧,其实就是那个top n的N,所以这里面我们看怎么去定义啊,呃,Integer,比方说你就定义成N也行,或者说我更明显一点,我就叫做top size吧,对吧。然后接下来我们实现一个constructor,现在的话上面就没毛病了,对吧?哎,这个就没问题了啊,接下来大家想到我因为用到了状态,是不是首先要把状态定义出来啊,定义状态,而且我这里面是一个。
05:00
是不是一个列表状态啊,List state,因为我要保存当前所有的item view count,对吧,就是保存。当前的窗口内所有输出的item will count啊,那这里边我就定义一个list state里边的类型啊,这个我就简单粗暴啊,就是用这个item will count原封不动的这个类型直接全保存下来了,那有同学可能说你既然当前已经KY window end了,是不是没必要在这个里边再保留着那个window and信息啊?对吧,你可以把那个删掉,保存成一个二元组也行,对吧?啊,就是一个一个item midd,一个count值就就够了啊,这个无所谓啊,我们多一点数据,这个现在并不影响你如果想优化的话,可以把它省一些啊,那接下来就是我定义一个item count list state。它的具体获取那个状态句柄,这是个k state对吧,根据当前key相关的嘛,那它是不是获取就必须在open生命周期里边去拿到当前的。
06:15
拿到当前的这个运行是上下文啊,Get runtime contact,然后去get list state,里边去new一个list state script,这个大家还记得吧,然后里边给一个名称,当前这个名称是,呃,这个随便给啊,比方说我叫item will count list对吧,随便给一个名称,然后外边当前的这个类型比较好写,就是item will count.class这样是不是就完成了?这就是我们对于这个list state的一个定义啊,这就是想要的东西都已经列好了,接下来必须要实现的一个方法。Process element每来一条数据是不是都会调用到这个方法呀?那大家想一下,我现在每来一条数据要干什么事呢?因我当前很简单对吧,是不是就是每来一条数据是不是就存入list子中啊?但大家注意还得干一件事,是不是还得注册定时器啊,对吧?并注册定时器,所以接下来我做的操作非常简单,是不是直接艾一个数据进来,就是当前的value对吧?然后另外在ctx里边有一个timer service,去注册一个事件时间定时器,Even time time timer。
07:39
里边我定义的那个时间戳,应该就用当前的。是不是就用当前的window and,然后对再加100,或者甚至我可以更短一点加一是不是都行啊,啊,这个肯定没问题对吧?所以这里边我直接用value.get window and,然后再去加一是不是就完事了啊,这个其实就是这样的一个定义过程啊。
08:05
有同学可能会又有一点疑惑,就是说那这不对呀,你这每来一个数据之后都注册一个定时器,那你这最后注册了多少定时器啊,那岂不是注册了好多吗?A,大家注意一下啊,我们说在flink底层它处理定时器的时候,它是按照什么来区分定时器的呢?时间是不是就是按照时间戳啊,那你看我们当前同一同一个窗口里边的这个数据来了之后,注册的定时器时间戳一样不一样啊,因为它就是按照这个window and分组的嘛,所以当前组内是不是肯定都一样啊,所以大家想是不是我相当于是同一个闹钟啊,对吧,这个重复注册这个没用啊,就是最后还是同一个闹钟啊,所以这个大家不用担心,好那么这个就其实已经保存完了,那最后最关键的问题是在于对我们还没排序呢,那是等到定时器要触发的时候,On timer里边接下来是不是要做一个排序啊,对吧,就是定时器定时器触发啊,那么当前以收集到所有数据是不是就可以排序输出了,对吧?输出top n啊啊,那所以这里面有一个问题,就是说我是不是首先得先拿到当前的所有数据啊。
09:25
对吧,当前所有数据你总得先先拿到再说嘛,呃,那这里边我可以去定义一个a list对不对,因为大家知道这个list set直接去做去做去做排序,你直接get之后,大家还记得那个get之后拿到的是什么吗?我先写一下啊,就当前我们的数据不都在这个list state里边,如果get的话,拿到是什么。拿到的这个是一个。哎,这里边本身是一个out类型啊,大家看到这个out呢,这是在定义在这个endding state里边的,它的这个输出out类型,那具体来讲的话,它其实本身是一个,大家一看它是不是11TERRIBLE类型啊。
10:08
对吧,里边可以拿到的是它的那个eator啊呃,是可以去去拿到它这个东西的,所以这里边我可以去简单的去做一个转换,就比方说我拿到这个eerator,但知道拿到eerator我也不能直接sort呀,是吧?诶那那所以是不是还是转换成一个list比较靠谱啊啊所以接下来我就准备要把它转换成一个a list。AA,呃,这这个其实我们可以直接用这个list啊,这个这里边有一个方法,List可以大家看直接new一个a list,然后里边其实是不是可以传一个able或者ator类型啊,对吧,这个是完全可以的啊,所以我可以直接把它这个传进来,呃,然后我把这个定义一下,比方说我这个就叫做item view counts,对吧,这个是完全没问题的,有了这个之后,接下来我是不是直接就可以排序了,对吧?Item will count,直接salt,那大家知道这个a list如果要做sal的话,是不是必须里边得传一个comparor啊。
11:13
Competor对吧?哎,那这里边我自己得写一下了啊,你得定义它排序规则,哎,那这里边competor,那大家说一下当前我是该怎么排呢。是不是要按照当前,因为I you count嘛,里边有那个count字段对吧?我要按照当前count字段的大小,是不是要倒序排列啊,那倒叙排是怎么排,这个大家还记得吗?是不是当前O1和O22个数来了之后,大家知道它最终那个得到结果是要判断它这个值,呃,是是是正负零对吧,那就对,是不是就互相减就可以了,所以我这里边直接return应该是前减后还是后减前,后减前对吧?这样的话就是一个倒序,就如果后边比前面大的话,这是正常的,这不用再再调换顺序,如果小的话,那就需要换换顺序了,对吧?哎,所以这是一个后减前的过程,那我现在是呃,O2是不是要get count呀,但是这里有一个问题。
12:11
大家注意这个compare这个方法返回的是不是一个int呀,所以这里边其实我应该是如果直接减的话,不对,我应该是怎么样。大家会想到我是不是应该要把它转成一个int类型才能建啊,啊对吧,所以这里边其实是你可以用一下当前的这个int value啊,但是这个int value的话,大家知道这相当于是直接截取了是吧?啊其实这个也不是特别的好,我们现在是count肯定没那么大嘛,你直接截取也是没问题的啊,就一般情况是呃,这个是没啥问题的,那那大家如果觉得有问题的话,其实可以怎么样。我是不是可以做一个直接做一个判断是不是就可以了,它比它大,然后我就返回一是不是就可以了,对吧,其实就是这样的一个一个判断的过程啊,或者我直接在这写啊,我直接get countt去大于Oe.get count,然后是不是做一个这个判断啊,如果要是,呃,这个是是这个大于的话,我给一个一,如果不是的话,我给一个负一对吧,当然这个有问题,就是因为我们当前是不是还应该得有一个零的情况啊,哎,所以这个可能你要考虑的问题稍微多一点啊,我们这里边只有三元运算,没有那种四元,那那你要判断更多是不是就是if啊,就各种if else判断就完事了啊啊,所以这里边我就直接给大家写一个,因为你直接剪的话,就不会有这个问题嘛,我就还是用这种方式给大家写出来吧,对吧?呃,就是t value啊,然后减去后减前Oe.get count.t value,这样是不是就没问题了啊,如果它俩相等。
13:48
是不是就是零了啊,没问题啊,呃,这是关于我们这个排序,其实大家知道现在得到的这个item will counts是不是就已经是我们最终的一个结果了,对吧?你如果想输出的话,想输出一个例子的话,当前这个结果就就已经可以输出了,或者说你写入到卡夫卡对吧?写入到这个,呃,MYSQL写入到这个red ES什么都可以啊,那这里边我们。
14:15
没有涉及到最终的输出到哪里,我是想做一个控制台打印,所以我还要把它格式化成一个string,所以接下来做的就是将呃,就是排名信息格式化成string。打印方便打印输出对吧。呃,那这里边我就首先既然是要这个,呃,定义成一个一个string去打印输出嘛,其实我可以在后面追加很多信息,我用一个stream builder好了。呃,或者stream b buffer也也可以对吧,我用一个string builder吧,先把这个呃,String builder先定义出来啊呃,那这里边其实或者我直接就管这个叫做result啊,Result builder。
15:07
然后接下来是不是我直接在它基础上往后openend字符串就可以了啊,首先我一开始窗口开始的时候呢,我先来一个分割线。让大家看的更清楚一点啊,然后接下来往后去A判,第一个要aend是不是应该是当前窗口,是哪个时间段的窗口啊,哎,所以接下来我是给一个窗口结束时间。呃,然后后边我继续去aend,大家想这里边APA是不是是不是就应该是我们那个window and,但是这里边好像,呃,当然我可以从就是这个呃,CX里边去拿对不对?大家看到就是我从ctx里边是不是也可以get current key啊,这个也可以拿到对吧?或者还有一种方法我可以怎么样。因为大家想当前我的这个就是定义的这个定时器时间,是不是就是window and加一啊,那现在我定时一出发的时候,他三是不是就是当前的定定定义好的这个时间,所以我是不是直接就用time Sam减一,你当时加了一,我现在减一是不是就是window and呀?啊,所以这个非常简单啊,当然这个显示的不是很明确,因为它是个长整型的时间戳,所以我可以把它转换成一个可视化的一个这样的一个一个time stamp对吧,然后再to string就完事了,所以我new一个,我直接用Java CQ里边的这个time stamp对吧,把它直接转换过来,当然这里边你不不to string也行,大家知道APA的时候相当于掉了它的to string方法,对吧?啊,所以这个就OK了,这样就可以了。
16:46
然后接下来呃,我换一行做一个输出,对吧,PA的一个杠N,然后接下来呢,就是是不是要便利我当前的这一个item will count counts这个列表对吧?便利里边的数据取里边的top n拿出来就完事了,所以遍利呃列表。
17:08
取top n输出,那这个应该是一个for对吧?诶那大家可以想到,就是我可以去直接比方说放循湾华安把所有的都都这个,呃,就是循环一遍对吧,但是大家想没必要对不对,我现在是不是直接用一个下标。Int一个I去做一个控制,只要循环到哪里就可以了。是不是只要循环到当前这个top size就可以了呀?啊,这里边还需要注意一点,就是还有一个问题,我们当前top size是五啊,假如说我当前的这个item view counts就没有五个呢,这就不对了,对吧?你这个循环是不是就是越界了呀?呃,所以这里边需要还还得注意这个小细节啊,我这里边做一个是不是要取最大还是取最小啊,是不是取top size和当前长度的最小值啊,对吧?以那个最小的为准啊,所以是top size以及当前的item view counts.size对吧?它俩取一个最小值便利到这个就可以了,后面是I加加。
18:19
好,那里边我当前是不是可以先拿到当前的一个item view count呀,对吧,我把这个叫做current item view count,因为有角标嘛,所以我这个很容易就可以拿到,是不是直接item will counts.get当前的I就可以了,这是不是就当前的那个数据啊,对吧?当前第几个的那个数据啊,接下来我们做一个输出,哎,不是return啊,Result builder往后面开始啊判了,那现在就是第几名,比方说number。呃,Number几呢,当前是。哎,对,那大家想到I是从零开始的,我们当然是希望从这个一开始,对吧,所以这里边是number I加一。
19:06
然后后边我再跟上一个,跟上一个冒号对吧?啊,就是接下来就是number几这样一个数据啊,然后接下来我继续往后end。Ipad一个啊,就当前的这个商品是谁对吧,商品ID。呃,后边要跟的end的是不是就当前的那个current item count里面的那个ID啊,诶那家想我直接是不是直接get item ID就完事了。后边再跟一个当前商品的热门度,对吧,热门度。后面end什么是不是当前的那个count值啊,这不就全用上了吗?这三个字段对吧?呃,窗口结束时间,还有这个window and,还有这个item ID,还有count啊呃,当然这里边就是我把这个已经整完了之后,最后我还是应该注意一些格式化的细节,比方说再杠N再换一行对吧?呃,然后再呃,就是输出了这一行当前的这个NUMBER1,再输出NUMBER2,再输出NUMBER3,每一个都换一行,然后所有的便利完了之后,当前的窗口是不是就搞定了,当前窗口搞定了之后,我在啊PA的一个一个换,换行的分分分割线对吧,后面我再。
20:27
杠N-N对吧,这样的话就可以得到最终的结果了啊,那另外大家为了呃控制一下这个输出频率的话,是不是可以做一个sleep呀,因为我们当年是读读取那个文件嘛,对吧?呃控制输出频率设置一个thread。点sleep对吧,比方说我这个隔一秒隔一秒钟去做一个输出,这个大家就能看到那个滚动的过程啊,就真实情况下其实不可能那么快,因为我们现在数据都到齐了嘛,真实情况下是不是应该是五分钟才会统计一个一个结果,然后马上输出啊,对吧?所以这个其实我们只是做一个测试而已啊,诶那这个做完了吗?
21:09
注意,这个美版我们根本没输出啊,你只是做了这么一个result builder,那你输出在哪呢?输出是不是还得out.collect呀?大家记得这个process function里面怎么输出吗?out.collect对吧?那collect是不是要一个string啊,所以我用这个result builder是不是在to string输出就可以了啊,这就是我们最终的这个处理流程啊,整体的这个结果,整整个的这个处理过程就是这样的。好,那接下来我们来测试一下,运行一下,大家看这个代码就是。还是稍微可能这个流程还是需要梳理一下的啊,倒也不见得特别复杂,关键就是大家要把这个每一步的操作都搞清楚,到底是怎么回事。好,接下来现在是已经起起来了,我们看。
22:02
稍微等一下,能不能看到他的结果输出。大家看到现在已经得到了这个输出结果。啊,当然这个前面我们那个少了一个换行对吧,所以大家看这个窗口结束时间是放到了上面,其实如果要是有一个换行的话,窗口结束时间就会到里边来,你看当前我们这个是不是10:35 10:40 10:45,五分钟一个窗口对吧,五分钟关一个窗口,当前的商商品的热门度按照它的这个排序,TOP5是不是全列出来了,商品ID是什么,热门度是什么,就都放在这儿了,对吧?这就是我们想要的这个最后的结果啊。
我来说两句