00:00
我们现在拿到这个窗口聚合的结果,接下来大家来看一看里边到底是什么东西啊,我们知道里边拿到的就是item view count对吧?所以其实是针对之前我们分组的数据要做这样的一个聚合结果输出,那大家看前面这两条数据,这就是到11点的时候输出的两条数据,对吧?诶,前面这是item ID是一,哎,我统计出来有四个item ID,二有三个他们,诶对应的这个窗口呢,都是十点到11点,当然本来我们那个里边包装好的是一个window and,这里为了让大家看的更加清晰,我们把这个窗口本身表示的这个呃,时间范围都写出来了啊,这个本身含义是一样的,那然后呢,诶,大家会想到这是到11点的时候,同时几乎同时EE输出了这样两条数据,对吧,然后是11.05分到11:05的时候,他会突然EE输出这样两条数据对吧,这都是10.05~11.05的数据,那后边我们如果要排。
01:00
去的话你怎么排呢?难道说你会把之前所有的数据都放在一起去排队吗?诶,你到这个11:05的时候,你说排名第一的是这个十点到11点窗口内的这四个吗?这这是排名第一的吗?当然不能这么干对吧?哎,我们当然要的是当前时间窗口内这个排序才是有效的,所以接下来我应该得做一个什么操作呢?诶大家大家注意啊,这里边最简单的一个操作就是再按照window and去做一个分组不就完了吗?对吧?啊,这样一分组的话,那就是每一个窗口输出的结果分到一个组里边去做排序,你跟自己排序对吧?你不要跟之前那个历史上,呃,其他别的窗口输出的那个数据去排序,这样不就好了吗?哎,所以这样的话,我们就可以直观的把它们都区分开了,另外还有一个就是大家可能会想到我,我这里边要做排序的时候,是不是,其实这就涉及到一个什么,我。
02:00
还得去把每当前窗口里边输出的每一个商品对应的那个统计值都先保存下来,最后才能排队呀,对吧?哎,你不能说是之前的那个数据,这个到了之后我就呃,就是直接就排嘛,因为你现在来了一个数据,你就说他是排名第一嘛,那肯定不知道啊,那肯定是不停的来,对吧,到后边我们才会收集起来,才会做一个排排序,当然这就又涉及到呃另外的两种思路,一种就是我们前面说的,你可以来一个,就相当于我们要保持一个状态,保持一个列表,对吧?啊,这个列表里边呢,来一个就插入一个,来一个就插入一个,然后每次就做一个插入排序啊,这是我们可以自己去做的一个操作。那另外还有一个非常简单易行的方法是什么呢?因为大家想到他们几乎就是同时,后边我们就会收到对不对,因为它本身就是同时这个,呃,输出的一个统计结果嘛,十一十一点零五分这两个数据就应该几乎同时。
03:00
同时输出同时收集到啊,那所以接下来我其实怎么样呢?我只要稍微比方说我稍微等他一点点,就是比方说我等它一秒钟,那是不是很快这些数据就都都到齐了啊,然后我就把当前的这个数据都统一输出不就完了吗?啊对吧,而且前面我做过分组了,我也不用,我也不用再去摘,按照他的那个时间,呃,到底是不是我当前要要统计的这个组,对吧,我也不用再去宅了,因为已经做过K了嘛啊所以接下来这个操作就非常非常的容易,那当然了,我们这里边说的是我等一秒钟,其实大家知道根本不用等一秒钟啊,因为理论上来讲,我们这里就是同时输出嘛,你等个100毫秒对吧?啊或甚至你等个一毫秒是不是都可以啊啊,这完全是没有问题的啊,所以接下来大家就看一看后续的操作到底应该怎么做,那首先就是KBY,对吧?根据window end进行一个分组,我只统计自己当前这个组。
04:00
里边所有的数据去排序就可以了啊,那然后接下来怎么办呢?接下来当然得定义一个状态了,呃,定义一个状态,然后还要去,呃,就是把所有的状态保存起来,还要去干什么呢?等到他们全收集齐了之后,我们说就是一秒或者100毫秒对吧?或者一毫秒之后收集齐了,然后输出它最终的排序结果啊,所以这个过程大家想想是不是还要注册一个定时器呀,对吧?哎,就是还要做这样一个等待和定时触发的操作,那既要做状态编程,又要定义定时器,我们该用什么了?大招,Process方式对吧?大家回忆一下之前我们讲过的那个最底层的API,所有事情都能做的,那那那个API啊,Process方式,所以接下来我们其实要中用这个process function来做这个最终的排序啊,那大家看一下我们这状态编程的思路,那当然就是。
05:00
分组之后的这个数据啊,根据window分组了,每一个组里边每一个window对应的数据,我们呢,都定义一个list state,那大家想这个list set当然是一个kid state对吧?根据根据K来保存的state嘛,哎,这还有这样一个好处,就是我这里边就直接可以用kid state了,它的这个类型比较丰富,哎,那这里边当然里边存着的就是当前所有的那个item view count,对吧,你来一个count,我存进去一个,来一个存进去一个,最后呢,就是收集齐了排序不就完了吗?这这里边不就是个列表吗?哎,我直接把它做个排序就完事了,后边要做的这个排序操作,哎,那大家想到这个既然是要用到了定时器,那就得用一个process function,而且我们还是做过分组之后的,对吧,还定义了k state这样的一个一个process function,那当然就是一个kid的process function啊,啊,所以当前的这个kid process function里边,我们就是针对当前。
06:00
每一个K,也就是每一个窗口里边的所有数据,每来一个就添加到当前的list state里边去,对吧?然后呢,我就注册一个,呃,比方说窗口结束时间再过一毫秒或者再过100毫秒,我触发一个计计算,触发一个排序输出操作,这样的话就是最简单的实现啊啊,那后边我们可以当然了,这里面还涉及到这个key process方式里边的一些,呃,一些方法和一些就是生命周期对吧?大还记得比方说open生命周期,大家还记得吧,可以做初始化对吧?呃,我们可以去定义状态,在这里边定义状态,然后那个process element呢,这是每来一个元素都会调用到的那个方法,另外还有一个on timer,那就是定时器了,哎,我们要做的那个排序操作得得在这个里边做,对吧?排序输出最后的核心其实都在这里边的,好,然后整体的这个流程,如果大家再看一眼的话,那就是open生命周期的。
07:00
时候,哎,我们创建一个listen state,对吧?定义一个状态用来存储数据,然后接下来呢,哎,是每来一个数据都会调用process element,那我们就把它直接添加到这里边来哦,大家如果想做那个增量式的插入排序的话,你也可以每来一个就直接把这里边就排个序,对吧?这个也是可以的,那最后on timer出发的时候就相当于不用做排序了,直接输出就完了啊,这也是也是OK的啊,就相当于一个增量的处理的方式,呃,但我们这里边因为本身list的底层就有排序方法嘛,我省得自己再去做这个调整了,那我怎么样呢?这里边就只负责添加,添加完了之后注册一个定时器,哎,你比方说我加100毫秒对吧,就window and窗口的结束时间过100毫秒,那大家想肯定你之前该来的都到齐了对不对,哎,就是这主要是用来区别什么呢?就是我这里边在输出这个统计结果的时候,尽管他们都应该是11:05输出的。
08:00
但是大家想,既然我是流式输出,就比方说你直接打印在我们控制台里边,是不是他俩肯定还会有先后啊,对吧?啊,至于谁先谁后这个这个说不准,但是他俩肯定是有一个先后顺序的,所以说里边我到底是接收到谁的时候,我就开始开始这个做排序呢?那这就不知道了呀,我也不知道这里边这个窗口里面到底有几个数据输出啊,没有这个结束的标志,对吧,那怎么样呢?那我就只有等时间过了11:05这个时间点的时候,你到11:05:01零一百毫秒,是不是肯定这个时间点过了,过了是不是前面的这个数据肯定输出了啊,所以主要是这样一个思路啊,啊,特别是我们现在很可能用的是事件时间对吧,那就是water mark只要过去了,那就所有的这个就都到齐了啊,那这里边主要就是做了这样一个操作定义,这个到它触发的时候,我们就把这个list state里边所有数据拿出来做一个排序,去前几名输出。
09:00
这就完成了,这就是一个实时热门商品统计啊,完整的处理流程。
我来说两句