00:00
好,上节课我们已经实现了这个自定义的窗口预聚合函数和窗口最后输出的这样的一个函数,对吧?Window方式得到了,现在得到的结果就是每个窗口做聚合完成之后的那个结果了。就是我们知道了每个窗口里边每个商品它的技术那个count的值是多少,那接下来我们要做的是什么?是排序,但是排序之前还必须得对,按照窗口去分组,对吧?哎,所以这里边我们还要加上一步。这里面我还是写一些注释吧,这里是呃,窗口聚合对吧?然后接下来是不是要按窗口分组啊,这个分组是不是又是一次KBY啊,当然了,这里面KY大家可以直接按照当前的window and去做KY就可以,对不对?所以按照窗口分组。
01:02
然后接下来是不是就要排序了啊,这个排序稍微麻烦一点,可能要拿到所有的数据,最后再一起处理,那我们直接,而且有可能要用到定时器,对不对?这里直接上大招process,写一个process方式,那这里边又得自定义了,我们这个就叫做top n。Hot items啊,那这里你到底hot几,是不是这里边可以传一个参数进去啊,比方说这里边三个或者五个,我们直接传一个参数进来,接下来只要实现这个自定义的处理函数就可以了。最后,最后一部分内容,我们要做自定义的处理函数。Plus。Top hot n。It对吧,好,然后接下来这里边这个I,我们觉得这个不太舒服的话,直接给它叫top size吧。
02:07
一共几个输出几个啊,那这里边它需要去实现一个什么东西呢?对,大家想到我们是不是做过K之后了,已经是,所以这里边我们可以实现一个,而且是就要按照不同的key是不是保存自己的状态啊,所以一定实现的是一个key的process方式,它里边的数据类型KIO大家还记得吧,对吧?那这里边的key应该是什么类型,诶,这里边我们已经用这个,对,用这个,呃,Select选择器把这个window and选出来了,那它返回的那个key的类型是不是就应该是window and的类型啊,所以就应该是long,所以这里边写一个long。假说IO它的输入是什么呢?当前的输入状态,Item will count,对之前的那个数据结构输出啊输出,最后我们要在控制台打印输出的话,我直接给一个string吧,对吧,我包装成一个可视化的这样一个string来显示,就像我们实时监控一样,把这个实现一下,大家看把这个写完之后,上面是不是就都没有问题了,没有这个语法报错了,接下来在这里边要实现。
03:23
必须要实现的方法是不是有一个process element啊,就是每一条数据来了之后怎么干,对吧?那大家记得我们的想法是每条数据来了之后干什么呢?每条数据来了之后怎么做啊。是不是就是要来一条数据,就把它塞到状态里面先保存起来对不对,然后注册定时器,等到所有的数据都到齐了之后,触发定时器排序输出啊,所以它其实就是把每一条数据都存起来就好,把每条数据存入状态列表。
04:06
那所以这里边我们是不是前面还得缺了一点事情,对吧,还得干什么呀,是不是还得先定义状态啊,所以这里边我们首先在外面先把这个定义出来啊,比方说private。Item state啊,这叫item state吧,这是一个list state,对,因为是一组啊。它里边的数据类型是什么呢?Item will count,因为每一个元素来了之后,是不是都是item will count,对吧?是不是每一个元素都是这样的类型啊,所以我们最后保存的时候把它直接存到这个list state里边,那么这个state是不是它的类型,就是每一个元素的类型也得是item没啊,在外面我们定义的时候直接先给它一个默认的空值,对吧?然后我们在哪里去把它声明出来呢?生命周期对,有open。
05:07
在open这个生命周期里边,我们可以做一些操作,一个基本的操作就是item state,给它赋值,对吧?怎么赋值呢?上下文get runtime contact,然后这个就不是get state了,而是get list state,对。然后这里面是不是要有一个list state script啊一样你一个list state script,然后里边一样也要给一个名称,对不对,Item state。It state。然后另外还得有一个类型,对吧,当前数据的那个类型告诉我flink到底是怎么做序列化,是不是class of item will count,所以大家看就是这样的一个过程啊,诶,这里边好,这里面是写的有问题了是吧。
06:08
那个没有。呃,是在哪里在哦,现现在这里是那个不可变的类型对吧,所以当然它会报错,所以我们把它改成Y类型,好现在接下来已经给了这个基本的获取到这个上下文里边的状态句柄,那接下来就可以操作了,对吧,那来了一条数据之后。那接下来怎么做呢?来一条数据就往里塞一条数据,塞的那个操作是怎么做来着,是不是直接调item state点它是不是有一个爱的方法就是追加对吧,直接把它追加进去,直接把value放进去就可以了,另外我们还得做个什么事情来着,对,注册一个定时器。
07:05
所以这里边注册定时器大家还记得吧,是不是要用ctx啊,它里边有一个什么,有一个timer service可以去register,我们这里现在要的是一个什么样的定时器呢?事件时间定时器,对吧?啊,现在全是靠这个auto mark来控制它出发的,所以even time timer这里面的时间我们就直接给value.window and是不是加一就可以了啊,或者你想加100也可以对吧,100毫秒相当于就是延迟大一点而已。迟一点出发而已。然后接下来真正核心的操作是不是要在要在on timer定时器触发的时候去做,所以我们最后还要把这个on timer做一个实现。
08:01
定时器触发时。对所有数据排序。并输出结果,所以在这个过程当中,其实我们是不是先得把那个存着的状态先拿出来啊,对吧?啊,所以这里边啊,我们先呃将所有state中的数据取出啊,大家会想到如果说要拿出来的话,我放到哪里呢?什么样的一个数据类型比较适合放呢?可能得放到一个,是不是得放到一个list里边啊,对吧,所以这里边我直接那个放到一个list buffer吧,放到一个list。八分钟。好,那么这里边我定义一个叫all items,它是一个list buffer。
09:10
当然里边的数据类型就应该是item view po,对吧?啊,那我new一个list buffer。然后接下来,呃,这里大家注意,我接下来是不是应该要把这个item state里边的所有数便利它一个一个拿出来,塞到这一个all items里边去啊,那这里面大家注意一下,如果要想做这个就是类似于for each这样的一个便利的话,还得引入一个转换啊,Import,一个scale collection。Java convers得把这个东西引入,所以接下来我们可以便利它。那就是对于。
10:01
便利啊,在这个item state里边,大家注意他如果要拿他的数据的话,不能直接便利这个state,而是要先去get,对吧。做这样的一个操作。诶,大家看你要实现这样一个放循环的话,是不是上面直接就变亮了呀,说明下面被调用了对吧?所以接下来大家看怎么样便利的过程当中,怎么样把它拿进去,放到这个all item里边呢?啊这个非常简单,既然是list buffer嘛,是不是直接加等于item就完事了,塞进去完事儿啊,这就是这个过程啊。好,呃,然后接下来是不是就可以按照。按照点击量,呃,就是我们统计的那个count对吧,按照count大小。排序。
11:01
并格式啊,这个这个后面我们再说吧,先做排序吧,对吧,排序之后我们可能单独定义一个新的这个list,那这里边我们把它叫做salty items。它是不是要在all items的基础上直接要做一个salt呀,既然是list buffer,那直接是不是可以有哎,Salt with也可以sal by对不对?哎,这里边我们用一个thought by,我们现在要BY什么呢?对,是不是要根据这个count这个字段进行thought它啊,哎,那大家想一想这个thought by本身应该默认是一种什么方式啊,应该是降序吗?默认是升序对吧?那如果要默认是升序的话,我们现在怎么样能把它变成这个降序排列呢?我们想要的是降序对吧?从大到小嘛,哦,这里边可以再给一个参数对不对?这里边可以给一个ordering的这个参数啊,Ordering大家看到它本身的类型是long,然后是不是可以有一个reverse啊反转对吧?啊,那那这个表达当然这是这个函数颗体化的这种表达了,对吧?呃,把这个参数这样放进去,所以大家看到这个相当于有了这个参数之后,我们前面是不是就是根据count去做了一个降序的排列啊,好,当然大家如果想用这个south with的话,可以用south with去实现啊。
12:31
啊,另外我们还要top n呢,Top n那就是so完了之后是不是,诶take对吧,直接take我们那个定义的叫top size直接拿出来就完事了啊,所以这里边是并取钱。N个对吧。然后接下来大家会想到这里边都已经这个,呃,该排序该做的事情都已经完成了,我是不是应该把之前的那个状态应该清空啊,释放内存空间啊,对吧。
13:09
啊,当然大家也可以等到,就是我们把那个数据全输出之后再清空也可以啊,啊这个就看大家这个习惯了,清空状态item state怎么清空状态clear,对,然后接下来是不是就是该有的这个排序是不是都已经有了,是不是就格式化输出啊对吧。呃,将排名结果格式化输出。当然这个输出这就大家随便自定义就好啊,所以其实核心就是前面这部分的代码到这儿为止,其实就已经都搞定了啊,那后面这一部分呢,我们就让大家看的更,呃,就是明显一点,我们单独的再把它做一个格式化,这里边我们result就既然是stream嘛,那就用一个stream builder。
14:05
然后去构建我们的格式化信息,你有一个string builder对吧,然后接下来是不是这个result直接可以往上end。字符串啊,对吧,这里边比方说我们先输出一个时间。时间是什么呢?当前的那个窗口的信息对不对,当前是哪个窗口的那个数据,那这里边a pen是不是应该把那个time STEM传进去了,但是你如果直接传一个time STEM,这个我也搞不清楚它是什么东西啊,所以是不是还是要对转转换成可视化的东西啊,我可以直接调用这个方法啊。直接用这个new一个time step,好,那大家看这个是不是直接传一个时间戳进去就可以把它转换成一个。
15:00
这个呃,C time stamp啊,然后这里边是不是要传一个long类型的时间戳啊,当前的时间戳应该是什么呢?这里边。是不是就是这里的这个time stamp呀。当前触发这个timer定时器的时间是不是就是这个啊,诶,那所以这里边我要传的就这个time吗。当时是不是它是在window and的基础上加了一的呀,我如果要想表示当时定义的那个窗口的关闭时间的话,是不是应该把它对再减一减回来对吧?所以是这样的一个过程啊。好,然后后边呃,这个完了之后,我把它那个杠N回车一下,对吧,然后接下来。输出接下来是不是就是输出每一个商品的信息了,对吧,每一个商品的信息,呃,这里的话,既然是每一个商品,我们是不是得便利这个salty it啊。
16:11
把它便利,然后每一个商品挨个输出就完事儿,所以当然是一个for循环,呃,那这里面大家会想到这个是不是应该是从哪里到哪里去便利呢,是不是?这这个下标应该是从零到是不是salted items,它的lengths大家注意是不是还得减一啊,对吧?啊,这这里大家会发现其实这个有更简单的写法啊,它这里边其实ID已经在提示了,可以用什么方法呢?那就首先。用until可能觉得就是稍微舒服一点,就是你不用再去再去减一了,对不对,对吧,Until可能稍微的舒服一点,另外还有一个更简单的方法是什么呢?直接用它的。
17:02
Indices这个所以啊,所以这个是不是就会因为这个你不管是to还是until,你是不是总得记着它到底包含不不包含最后那个对吧?啊,那这里边如果要是我们直接想要它所有的下标的话,是不是直接用这个就完事了,对吧?这也是一点小的这个改进啊,好,然后接下来具体的每一个我是不是先得拿到当前的那个item啊,定义一个current item,它是不是就应该是saled items里边的Di的元素,对吧?有了它之后所有信息都从它里边拿就完事了,那result.aend接下来是呃,接下来是呃,第几个排名第几对吧,我们写一个number。后面还要end number几呢,是不是第二个呀,这里要注意I是从零开始的,是不是还得I加一啊,对,所以之后再apad一个冒号。
18:08
然后接下来大家会想到是不是继续点PA的呀,对吧?啊,接下来这个我们可能就是空一格啊商品ID。商品ID应该是什么呢?呃,这里不要再再冒号了,我们直接等号好了,商品ID就是当前的。什么?是不是当前的商品就是current item啊?那这里是不是就直接从current。Item里边去取取他的item ID是不是就完事了。是这样吧,对吧?当前的商品不是已经从这个排好序的数据里边拿出来了吗?这里边的每一个元素是不是都是一个item view count啊,里边是不是包含了商品ID信息,商品的那个当然有window and,对吧?是不是还有那个count值啊,我们现在是不是就是每一个商品ID是什么,然后数量count是多少,拿出来就完事了,所以其实就是把这个东西拿出来啊,然后再end。
19:16
呃,接下来可能是他的那个,呃,就是就是那个count值对吧,我们就叫浏览量吧。A,那这个是不是要current item.count拿出来对吧,最后啊,当然最后我们在。做一个回车换行,哎,这样就可以了,对吧?所以当这个for循环便利完的时候,当前时间窗口的top n商品是不是就都输出完毕了啊,当然这个输出完毕之后,为了隔开啊,我可以再去给它呃判的一个一个分隔符,对吧,隔开一点,然后为了控制输出频率,我可以人为的对sleep一下。
20:10
thread.sleep。呃,这里边我们停个1000毫秒,停个一秒输出一个吧,看的清楚一点对吧,那大家看这样已经输出完了吗。这只是把这个result,这个stream builder搞定了,对吧?你最后要输出什么,得怎么样才算输出啊,对,是不是一定得用这里边的这个out把它collect才算输出啊,啊所以大家注意啊,当然输出的类型是string类型,所以out.collect collect这里边是不是直接result,注意result我们需要的是string,是不是这个string builder还得再to string啊,直接把它转换成string方法就可以了。啊,所以这就是我们具体的一个实现,这样整个这个过程就已经做完了。
21:01
啊,大家可以再梳理梳理啊,看看我们这个过程,其实最后这一部分看着好像还有点儿麻烦,其实很简单,就一个一个便利包装成自己想要的数据类型,做可视化,对吧,类似于做可视化了。核心的逻辑其实就是前边先把这个按照窗口做聚合,聚合好了之后再按窗口分组做排序,对不对,拿到所有的数据再做排序,那么怎么样能拿到所有的数据呢?我们把每一个数据放到一个状态里边,状态列表list state里边,然后定义一个定时器,定时器延迟触发,保证所有的数据能到。呃,这是这样的一个过程,好,那然后我们就测试一下吧,这里边要测试的话,我可以把这个data stream data stream不要打印了,我直接把这个,因为那边是在不停的读数据出数据的,对吧?我直接把这个processed stream直接打印出来。看一下效果。做一个测试,跑一下。
22:00
大家可以看到这个输出的结果啊。这就是我们这个输出的结果,大家看这个我们输出了一个时间窗关闭的那个时间对不对,这个时间是不是就相当于是五分钟一个啊,对吧,这个看的更明显一点,然后统计出来,哎,这个top n的这个浏览,浏览的这个商品大家看确实是一个降序排列,对吧?按照从大到小的一个降序排列啊,那当然这个时间窗口本身是一个小时的时间窗口,它统计的一个一个小时之内的数量,这就是我们这个代码的一个实现的结果。
我来说两句