00:00
呃,大家看到现在我们是已经把这个呃代码已经实现了窗口得到聚合结果的这一步了,然后在接下来呢,那就需要基于这个聚合的结果,哎,我们就是针对每一个窗口的数据做分组,然后再哎自己定义那个状态,把它收集起来对吧?然后再去排序,我们还要定义一个这个呃,比方说100毫秒或者一毫秒之后的一个一个延迟触发的一个定时器,对吧?到那个时候再把所有的数据做一个排序,所以接下来的操作其实也就是一个我还是重新定义一下啊,这个叫result stream,就是a j stream,基于它先去做一个K对吧?哎,这里边我可以去按照当前的window and去做一个分组,然后呢,那接下来既然是用到了状态编程,而且用到listen state对吧,还要用到一个定时器,所以当然我们直接就。
01:00
Process上大招,Process里边要自定义实现一个key的process function,那这里边我们自己定义一下它的这个名称吧,我们要做的其实就是提取top嘛,对吧,比方说我要的是TOP5,呃,Top n top items啊,然后呢,我可以把这个五直接作为参数传进来对吧?哎,这这样的话,我当前这个类你就给top几都可以嘛,通用性就会更加强强一些,所以接下来大家看到我这是按照窗口分组收集当前窗口内的呃,商品商品count数据对吧,它的那个统计值,然后我们这里面就是自定义,自定义处理流程啊,那接下来我们看看这个自定义的。
02:00
Process function到底又应该怎么实现对吧?这是我们的这个自定义pro kid的process function,好,来,那我们把这个写出来,当前这个是top n,呃,Hot items对吧?这里边的I我们可以直接给它一个名字,这个就叫top size吧,当前给的这个前几名的一个一个一个大小啊,想要截取的一个大小,它需要实现的这个大家就熟悉了的process function,然后里边我们知道它要传的类型是KIO对吧?哎,那当前的这个K是什么呢?诶,这里把这个引入啊,K其实我们看到它又是基于window and window and本来是长整形,但是大家看我们当前这个传一个字符串的时候呢,它又是得到了一个Java temple对吧,又是一个Java元组,所以这里边呢,我们还是要给的这个类型是一个。
03:00
Temp类型,元组类型,然后接下来IO,对吧?传入的数据类型呢?就是item view count,当前我们的数据不就是这样吗?那输出的数据类型是什么呢?哎,我们当时说你可以直接把这个排好序的IW count输出,这个也是没问题的,这个还简单一些对吧?我们这里做的稍微复杂一点,把它格式化成一个字符串,可视化的做一个展示,那所以它的输出就变成了string类型了,对吧?哦,那所以上边如果这里啊,大家看这已经没错了啊,如果我们要把它写出数据类型的话,最终是得到了一个data stream stream对吧?啊,那后边如果我们要看这个当前的效果的话,就直接把它做一个print打印就完事了啊,这就是我们最后的一个输出的过程了,当前我们主要还是要实现这个K的process function,大家看一下,必须要实现的方法只有一个,那就是process element,每来一个元素都会调用到这里来,对吧。当前这个元。
04:00
就是这里这个value,然后呢,呃,因为是K的process function嘛,有上下文对吧,这个上下文里边有那个timer service,我们可以去注册定时器可以获取,呃,这个当前的处理时间和water mark,哎,有很多事情都能做啊,很多信息都能够获取,另外呢,输出数据的时候用这个out对吧,Collector这个用它的这个collect的方法啊,那所以接下来我们是要先把那个状态定义出来,因为有状态编程嘛,对吧?啊所以我们先定义状态,这是一个list state对吧?啊,就是每一个窗口都应该有这样的一个list state,保存当前窗口内所有的这个商品对应的那个count值啊,那所以这里边我们把它就直接定义成,哎,那大家知道,如果说我要用这个生命周期的这种定义方式的话啊,因为大家知道就是一开始在这个类本身创建的时候,还没有上下文嘛,哎,我们定义。
05:00
K state都要基于运行是上下文才能够获取它的状态距离,所以这里边定义的时候呢,可以在外边先声明出来,然后到open生命周期里边,从运行上下文里边去获取,对吧?获取到它的那个具体的那个运那个状态的句柄,那这里边我们定义一个,呃,当前这个啊,就叫做item will,诶,就是我们当前要收集的,其实就是当前的那个item will count,对吧?Item will count list state对吧?啊,这个整的稍微长了一点啊,但是大家能看得清楚一点,好,那本身是一个list state类型,它里边的数据类型是什么呢?大家想到我需要的其实也就是一个item,一个count,是一个二元组,哎,那何必那么麻烦呢?Item will count里边也就多了一个这个window窗口的信息而已,我直接把它传进去完事了嘛,对吧,所以我直接就把这个item view count作为当前的这个list state。
06:00
数据传进去,然后在外面声明的时候呢,先给一个空值,然后接下来在定义这个,我们获取那个运行上下文去做定义,那应该在open生命周期里边去做处理了,对吧?呃,在这里边我们定义的这个item will,那就直接可以去赋值了,Get runtime contact,大家还记得吧,直接可以get,现在get什么呢?List state,然后里边去new一个list state script,对吧?啊,里边的类型是item you count啊,然后啊,接下来里边大家还记得要给一个name当前的名称,哎,我们当前这个就叫item will count will count list对吧?呃,随便给一个啊,我就叫这个,然后里边还要给一个class of类型item will count,这样把它定义。
07:00
对吧?啊,当然就是前面这里边我们定义这个bar的时候,你也可以加上一个private,就是表示只有当前的进程能够访问它的这个,呃,当当前我们的这个类可以访问这个状态,对吧?别的地方是不能访问这个东西的,好,这样可以把它在open生命周期声明出来,然后接下来就是process element了,每来一条数据的时候,到底怎么做这个操作呢?诶,我们其实想要的就是。来一条数据是不是就直接把它添加到这个里边去做排序就完了呀,而且我们说这个排序我们直接扔到后面,那定时器触发的时候再排对吧?你如果想要做优化的话,可以说每来一条数据就排一次,每来数一条排一次,我们现在不做优化,那就加进去就完事了嘛,所以这个特别简单啊,就是我们这里边就是每来一条数据直接加入list state对吧?诶,所以我们其实就是调用了item welcome list state啊,它有一个方法大家还记得叫做at对吧?哎,那这里边at的方法要传的呢,就是对应的这个item view count,那我们value本身就是嘛,传进去就完事了啊,另外还得做一件事干什么呢?注册定时器对吧?啊,注册一个呃,Window end,比方说我们直接加一毫秒之后,对吧,加一之后。
08:26
后触发的定时器,诶,那大家记得这个定时器是ctx上下文里边timer service对吧?然后去register even time timer,那这里边的这个window and从哪里去取呢?大家还记得,诶,这就是我们这个把这个value包装成item will count的好处,对吧?这里边value里边本身就有一个值叫做window and,然后它再加一,这里边本来就是毫秒数加一是不是就是一毫秒之后啊,注册一个定时器等待出发就可以了啊,所以大家看这非常简单,就是来一个添加进去,来一个添加进去,那什么时候去排序,什么时候得到结果呢?一毫秒之后,哎,大家想现在是事件时间嘛,Watermark控制对吧?Watermark到一毫秒之后的时候,那是不是就表示在这个这个window and加一啊,呃,假如说这个window and,我们就是九点钟对吧,那九点加一毫秒,这个时刻摩rock来了,那是不是说明。
09:26
九点之前的所有数据肯定都到齐了呀,对吧,那要是这样的话,是不是九点之前要关的那个窗口也肯定都关闭,都已经把数据都已经输出到这儿来了呀,对吧?所以这个就可能表示我们之前确实数据是都到齐了,因为你如果不用这么一个判断的话,它没有办法判断,你说我们前面这个,呃,每来一条数据,它里边的这个window and都一样啊,都是九点钟啊,那你说到底是接收到哪条数据的时候,我就可以输出这个排序的结果了呢?没准啊,那那我就一直等吧,你到底等到什么时候呢?我也不知道对吧,我并不知道当前窗口里边到底有多少商品的,它的这个count值统计起来了,所以这里边我们一定要有一个标准,那这个标准就是等所有数据都到齐,都到齐的标志是什么呢?当然就是water mark过了window and,那之前的数据就都到齐了,对吧?哎,所以这里边我们加上一毫秒,一毫秒之后触发,然后这里大家可能还有一个小疑惑,就是哎,你要这。
10:26
说的话不是说每来一条数据都会调process element吗?那这里面难道是每来一条数据都重新注册一个新的定时器吗?这会不会这个重复注册了呢?哎,这就是又是之前跟大家说的定时器里边的这个ID靠什么来区分定时器呢?就靠当前的time step对吧?所以如果说这里边我们看到value.window and,所有当前的这个数据,不管是哪个value来了之后,它里边的window and前面我们是按照什么来分组的?不就是按照这个window and来分组的吗?所以如果它能够分到同一组,进入到同一个,呃,这个后面我们的这个process element这个方法里边来的话,那这里面的window and是不是肯定都一样啊?如果window and都一样,时间戳就都一样,那是不是就相当于是同一个定时器啊?所以这里边我们就相当于偷了个懒,对吧,本来应该是,呃,在另外有一个状态去判断是不是已。
11:26
已经注册过定时器了,如果没有注册过定时器的话,再去注册,注册过的话就不要注册了,但这这里边或者说我们判断一下是不是第一条数据对吧?有这样一个状态,这里边我们投了个懒,就是因为即使重复注册,他最后也一样一样效果,还省得我们可以少注册一个状态对吧?所以就用了这种方式,大家理解一下就可以了啊这个还没完,最后我们关键是要定义一个on timer对吧?啊,就真正最后我们要做的这个排序和输出操作都在这儿呢啊这里是呃,当定时器触发,可以认为所有呃窗口统计结果都以到齐,呃,那可以排序输出了,哎,这是我们的整体的一个逻辑,那接下来我们就在这个on timer里边,首先应该要把当前的这个,呃,就是当前的这个状态list。
12:26
要拿出来对吧?哎,你要不然的话,这个list set它没办法直接去排序啊,那所以呢,我可以另外再去定义一个数据结构,比方说啊,这个为了方便排序啊,另外定义一个,比方说我定义一个list buffer吧,这样的话可以直接动态的往里面添加数据,对吧,我就挨个的一个一个把那个数据拿出来塞进去就完事了啊,那个保存list保保存啊list state里面的所有数据,哎,所以这里边相当于就类似于我定义一个变量,就把这个值再倒出来,对吧,类似于这样的一个操作而已,但是大家知道就是我这里边这个,呃,Item will count list,你这里边直接get之后,诶这里边其实你直接get去去遍利这个也可以,对吧,但是它当前我们这get之后拿到的是一。
13:27
啊,这里边的一个out类型对吧?A pending state的一个out类型,那当前的这个out类型呢,其实这里边本身它是一个可迭代orable的类型,对吧,大家看他有这个for each和这个呃,Itator方法啊,但是呢,你直接如果要要对他这个做salt,这个做做这个sort的话,这个是没办法去salt的,所以它其实还要再去啊,就是相当于再再转换成一个list对吧?啊要要有这样一步操作,所以这里边呢,我们就另外定义一个这个东西啊呃,我们把比方说这个定义一个all item will counts对吧?哦,Item counts,呃,然后它本身那就定义一个list buffer了啊,我们直接就用这个tla里边的Li buffer数据类型,它里边每一个元素当然也是item view count了啊,我们直接先创建一个空的,直接用这个,呃,它的半生对象啊。
14:27
把它创建出来,创建一个空的,然后接下来呢,嗯,那大家就知道我可以去用它的那个迭代器对吧?呃,当前它既然是一个迭代器类型嘛,那我可以就是well,呃,这里边啊,我先把这个eer先定义出来本身。就用我们当前list state的这个get对吧,然后定义它的这个eerator,然后接下来呢,哎,我们就可以用迭代器的方法去迭代了嘛,如果当前的it has next的话,哎,那怎么样呢?就把当前的这个元素添加到当前的all item view counts里边去,对吧?那当前我们可以直接调它这个加等于方法嘛,哎,我直接用这个it.next把它获取出来,添加进去就完事了啊,所以这就相当于是一个获取它的这个里边所有的这个数数据啊,放到另外一个可排序的一个数据结构里边,只是这样而已,然后既然已经都倒出来了,那是不是就可以清空状态了对吧?哎,我们提前清空状态可以节省这个内存空间啊呃,这个清空状态,因为我要用的东西都已经在这了嘛,哎,所以直接这里边item state welcome Li state啊,调用clear方法把它。
15:46
凭空,然后接下来呢,排序对吧?按照按按照count大小排序啊,那这里边我们要得到的是一个排序之后的saled list will counts啊,那这个就不用写类型了,对吧,直接转换就可以了,大家都知道啊or,基于这个all item will counts做做排序转换,那大家知道可以thought by,也可以thought with对吧?哎,那这里边我可以直接这个thought by thought by,当然就是BY这个count字段了,但是大家要注意count字段这个这个呃,进行处理之后,本身应该是按照从小到大排序的,对吧,我现在是要从大到小排序啊,然后取前几个嘛,哎,所以这里边呢,还要再做一个反转对吧?哎,那大家可以去直接你到这儿的时候去去重新去做一个reverse,但是大家知道,这相当于是做了两遍操作。
16:46
就是我先排好再去re reverse,那能不能直接排的时候就按照倒叙的排呢?诶大家记得可以传一个影视的参数进去对吧?用颗粒化的这种这种传参方式啊,直接传一个ordering ordering参数对吧?这个大家记得吧?啊那那这里边我们可以把这个类型定义好啊,ordering.long然后reverse把这个传进来,这就表示当前的这个长整型值要倒序排对吧,从大到小排,然后最后提取前N个对吧,我们这里边take,我们叫那个N是top size,直接把它拿出来就完事了啊,这就是当前的这个操作啊,取前N个对吧,这就这就排好了啊,所以如果说我们不做别的那些可视化定义的话,我们直接把这个列表啊里边的每一个,呃,具体的这一个item view count的那个数据挨个输出。
17:46
就完事了,这就是我们最后的结果了,好,那当然了,现在我们还要做一个可视化,所以接下来呢,还要做一个操作啊,我们将排将排名信息格式化成string,呃,做便于打印输出可视化展示啊,那既然是要弄成一个string,那大家知道这个string我们肯定是要拼对应的那些呃,那些字段对吧?啊,那所以这个大家大概知道一下就行啊,那我先定义一个呃,Spring builder对吧,或者我定义一个spring buffer都可以啊,我定义一个spring builder好了,呃,然后这里边我先把它扭出来,然后在后边直接去做这个添加就完事完事了嘛,对吧?呃,这个result啊,最后的结果我直接end,然后写一下啊。
18:46
当前的时间是什么?其实就是窗口窗口结束时间对吧?啊是什么,然后后边呢,当然要end当前的那个时间,当前时间是什么呢。
19:00
诶,有些同学说这这不知道啊,我哪知道当前实验室什么呢?诶你找我们的那个key嘛,因为现在KBY的时候按什么做的做的key by不就是我们那个window end吗?对吧?哎,所以这里边我们直接从K里边是能拿到window end的,Window end就是结束时间,那当前的key从哪里拿呢?哎,现在大家看on timer里边也有一个上下文,有一个ctx啊,然后当前的这个,呃,就是另外还有一个这个time stamp,对吧,Time stamp。这里边我们其实直接是可以知道它是什么呢?就是之前的那个window and加一之后的那个状态,对吧?哎,所以这里边有一个最简单的操作是我直接把这个time stepmp在减一不就完了吗?对吧?哎,这就是当前的这个时间time stepmp减一,哎,但是有同学可能想到这里边是一个长整形的数啊,我直接说这个窗口结束时间是一个长整形的时间,戳看不明白对吧,不知道这是到底什么时候,那我要更加可视化的好一点的话,那我把它包起来是不是转换成一个就是我们这里边这个time stamp这个数据类型啊,对吧,Java CQ里边的这个time stamp转换成这个东西啊,然后呃,当然了,就是后边,这里边其实我应该判的话,要调一个它的a toth string方法对吧,但这里边知道你这里边可以自动类型转换嘛,直接调它to string方法,大家看这个不报错,诶,那我们直接就输出了啊time stamp的to string方法。
20:30
然后后边这是当前的这个时间,后边要格式化的话,再换一行对吧,换一行再操输出,这是当前的窗口啊,然后接下来呢,大家是不是想到我得遍利这一个这个列表里边的前N个对吧?然后挨个每一行输出一个第几名是谁,第几名是谁啊,相当于就是一个循环便利了啊,这个便利结果列表中的每个item will count,呃,输出到一行对吧?哎,那接下来当然就是一个for循环了,哎,我们直接做一个这个呃,For的一个推导式啊,一个表达式scla里边,我这里边要的是什么呢?因为还涉及到第几名,所以说我对那个就是编号那个角标也感兴趣,对吧?我不能直接便利这里边的元素就完了,那我要遍利角标,那便利角标的话,我得,诶有一个。
21:30
大家可能知道我可以用它的那个点lengths,然后就是从从零到它的那个until,它的那个点lengths,对吧?但是那个到底是until还是two,那个还有点麻烦,我们直接有一个简单的写法是什么?直接用它的indices,用它的索引拿出来就完事了,然后我们去遍历那当前current item will count,当然就是啊,就是当前的这个saled里边的第二个,对吧?呃,就先拿出来,然后接下来我们就在后边去啊,把它做一个追加了啊,直接判现在是第几名对吧,我们加一个number。
22:10
呃呃,直接number,后边直接直接加吧,对吧,不需要再做这个冒号了啊,比方说呃,Upon的一个I,哎,注意是I加一对吧,因为我们角标本来是从零开始嘛啊,我们出出当然是从这个第一名开始输出了numberone开始对吧?啊然后后边在aend这儿我们再加上一个这个冒号再空格对吧,再去做输出,然后接下来后边就继续aend,这里边aend的是接下来要的是一个商品ID对吧,Item ID啊,我们就写成汉语啊商品ID,呃,因为上面已经有这个冒号了,这里边不要不要给冒号了,我们只直接给一个等号吧,对吧?诶这商品ID,然后接下来的当然就是current item will count的item item ID对吧,本来就是呃这这里边我直接在这儿的话,直接可以调它那个to string方法对吧。
23:10
所以直接放在这儿就OK了,然后后边继续去做这个ipad对吧,直接追加到后边啊,然后这里边追加到后边的话,可能我这里面还要加一个空格对不对,就是方便大家看的这个展示看的好一点啊,或者说你直接加一个这个杠T也是可以的啊,比方说我们这里边加一个杠T对吧?啊,然后下边来一个接下来就是当前的啊,就是热门度对吧,或者说其实就是它的那个浏览量嘛,热门度。后边去current item will count里边的count值拿出来就完事了,对吧?啊,其实就是这样,然后最后这里完了之后给一个给一个回车换好,哎,这样的话,当前的这个输出就就完成了啊,那每一次这个for循环完成之后呢,我们这里边可能就是不同的这个窗口之间可能在隔上一行对吧?呃,比方说我们这里边在这个呃的一个分隔符吧。
24:17
就是我们直接用这个一行横线分隔开对吧,再杠N,再再把它做一个这个换换两行吧,隔得开一点啊,然后为了控制我们的输出显示频率,我们输出完一次之后,因为这里边我们读取的这个数据太快对吧,从文件里面读取啊,这个太快,所以说控制输出频率,隔一段时间我们停一下对吧,输出一次窗口之后停一下,模拟这个实时的效果啊,给一个一秒钟的暂停啊,那注意现在这还不是输出了,最后输出要怎么样输出呢?你这个result,这只是定义好了一个result吗?啊,那你得把result直接写到我们输出缓冲里面去啊,用out.collect大家还记得这个吧?out.collect把result最后我们要的是一个string,所以说把这个string build调它的to string方法转换输出就完事了,这就是整个完整的流程,到这里我们就全部结束了,最后可以看到它的效果。好,这。
25:18
代码已经写完了,那接下来我们运行一下,看看效果怎么样,看到现在这个已经运行起来,接下来我们就就是静静等待它输出最后的效果了,大家可以看一看最后是不是符合预期的啊。哦,当然这个稍微会有一点慢啊,大家会看到就是前面我们定义的这个处理过程处理的,呃,本身处理的内容还是有点多的,对吧,而且就是本身这个文件里面的数据本身也比较多啊,同时我们读取要要读取这个大量的数据进来。大家看到这里边我们输出的结果样子是这样的啊,然后第一个窗口这个09:05的时候关闭了一个窗口,对吧?然后我们看这个按照这个排名啊,第一名商品ID是这个热门度啊,这个就比较小啊,后面大家看到每五分钟一个窗口,接下来这个这相当相当于这个数据就会越来越多了,对吧?哎,所以当前我们这个其实现在还是在不停的不停的输出的,对吧,我们看到这个当前是动态输出的一个过程啊,因为是每隔一秒钟,我们要就是做完一个窗口输出之后要停一秒钟嘛,所以这里边是不停在输出的一个状态,然后你看到到后边的话,我们一个小时之内最热门的热门的那个商品,它的点击的次数可能会达到20多次啊啊,这个差不多就是一个当前这个数据的一个真实的反应了啊啊,那大家可以看到这个过程就是这样的一个结果,当然我们这个可视化呢,可能做的也没有那么好看是吧,你如果想要再看的更清晰一些,比方说中间我们再隔上几行可能可视化。
26:55
它的效果会更好,但是我们其实已经可以非常直观的看到这个效果了,如果我们外部写一个这样的程序,对吧,你在大屏上做一个滚动显示,哎,这其实就是一个典型的应用,大家可以先把这个呃下来之后把这个代码先敲一敲啊,感受一下这种应用的过程。
我来说两句