00:00
节课我们在做讲解的时候,大家发现当前的这个热门实时的热门页面的统计,我们经过一个呃,处理延迟数据的这个引入啊,引入了这种方式之后,确实是可以将当前乱序的数据做很好的一个处理了,但是呢,最后输出的结果有一个bug,哎,那就是那就是如果是对于同一个URL而言,我们在处理乱序数据的时候,假如说我们这个窗口可以允许多等待一分钟的话,那么在这一分钟之内来的数据,它有一个有一个问题,就是说每来一条数据就立刻叠加在之前的那个聚合基础上,然后马上输出一个结果,对吧?所以当时我们看到这里边的这个AJ的这个结果啊,很快就是这里边,如果后面再来一条迟到数据,Agg的数据就又会输出一条结果,输出一个配置view count,如果我们输出多个,就是有多条迟到数据来的话,大家会发现这几条数。
01:00
对,最终都会进入到我们最后的这个process function里边来,到了这个process function里边来的时候,大家发现每一条数据它都会注册定时器,然后等到这个定时器触发的时候呢,我们都是要把当前的这个就是列表里边的所有的元素都拿出来,要去做一个排序处理的,对吧?啊,在这个过程当中,你会发现,如果说我有同样URL的这个数据作为这个item count啊,之前都已经输出出来的话,给到这个窗口聚合结果这里边都已经给,就是我们我们上游的任务都已经传过来的话,那在这儿我们这个例子里边是不做区分的,因为当前的process方式是以window and啊,以窗口作为分组的条件,那至于这个方分组里边你是同一个URL还是不同的URL,我们并没有去考虑之前我们在做这个操作的时候,这就这就是因为我们之前根本就不考虑他会有这种之前的那个。
02:00
呃,一个URL统计出来的数量还会去更改的场景,对吧?那它只有一个统计值,那我们当然直接排排序就完了,它不会出现重复的,但现在呢,因为我们后面还会去更改,还会去再来一个patri view count,这里边的URL有可能重复,这就会导致我们之前有可能出现这个就是大家看到呃,你排序之后的这个结果,哎,热门度这个排第一排第二的都是同一个URL,就变成它刷屏的这个状态了,那这个bug怎么去解决呢?呃,其实大家如果要是呃仔细去做一个思考的话,就会发现在这种场景下,显然我们还用一个例子的去表示当前这个状态就不是很合理了,我们应该怎么样去调整呢?应该在保存这个状态的时候,还是有key有value的一个状态,对吧?那就是前面我们这个分组的时候啊,K process function里边,这里的key是当前的一个window window end,但是我保存状态的时候呢,还要按照每一个URL,或者说每一个商品ID向前一个那个热门商品统计啊,按照这个ID再来做一个对应的保存,这样的话,后边我做这个更新它的这个,假如说来了这个迟到数据,更新这个count值的时候,我只是查询到对应的K里边对应的那个值,把它做一个更新就可以了,而不要去直接添加一个新的值。
03:27
啊,大家想到的这样一个思路对吧?如果我们要这么去做的话,那相当于这里保存的状态应该不是一个list,而是一个PY有对,而它什么样的是K有对呢?当然就是map对吧?啊所以这里我们做一个更改,做一个呃改动啊,我这里不要再用之前定义好的这个,呃,这这个是我们后边做排序的时候这个四八啊,现在我们是看一下这个一开始定义状态的时候。在这个方式里,大家看一开始我定了一个的,对吧,在这里我要把它做一个更改,那我们这里边要定义的还是用lazy的方式定义出来就不是list state了,而是我定义一个page view count map state,哦,那里边本身这个类型啊,还是map里面大家看到有两个范型要传入,一个是啊,就是当前我的那个K的类型,一个是value的类型,那我们现在K的类型是什么呢?URL嘛,那当然是string了,Value的类型我现在就存那个count值不就完了吗?哎,当前我们count值是长整性,所以我把它列出来,然后后边,哎,那这就是创建这个map,呃,Map的方法,Get map里边你有一个map statescript,然后大家还记得就是这个稍微麻烦一点,里边我们要传的这个类型呢,是不是也是两个啊,大家。
04:59
大记得这个吧,对吧,跟我们前面那个类型是一模一样啊,所以这里也是cake类型,String value类型长整形了,这里给一个名字,Page view count map对吧,啊,就是map的一个状态啊,然后后边同样这里边我们要有两个。
05:21
就是一个表示当前K的类型指定,另外指定当前value的类型,把它定义出来好,然后接下来,诶,那如果前面我们这个改成了BA state啊,那下面我们在做这个处理的时候,每来一个元素添加的时候怎么添加啊,其实也是一样的嘛,只不过就不要用这个list state的这个ADD方法,而是大家还记得如果要是去在map state里边添加元素的话,是put嘛,K value直接put进来不就完了吗?哎,我们现在要的就是它的URL以及。当前的count值对吧,哎,放在这儿就完事了嘛,然后同样还是去注册一个定时器啊,然后一毫秒之后出发,然后到下边这个触发当前这个状态的时候呢,诶这里面我们就是同样还是啊把这个住掉。
06:17
接下来我把这个就给大家,还是把这个就是全部住吧。因为后边的这个操作可能也会略有不同,我把上面这块都住掉啊,然后接下来呢,我们定义一个这个特,还是从这个状态里边啊,大家知道这个状态本身如果要去get的话,这里边我们是要get某一个K对应的那个值,对吧,而不是拿到所有的键值,对,那怎么样拿到所有的建池对呢?哎,大家知道有一个entries方法对吧?啊,就是这样拿到的就每一个entry都是一个key value这样的一个,对,我们先把它拿到,然后同样它也是一个可迭可迭代类型,我们得到这个eator,接下来去便利的时候呢,Well iter,这个还是has next,那接下来我们这个做的操作可能会稍微多一点啊,我先要拿到当前的这个entry,这是eater.next达到,因为最后我们想要的是什么呢?包装到这个,那就是前面我可以把这个也也住了啊,大家会想到我现在要用的这个,呃,比方说最后要排序的这个。
07:26
格离四八份也不是一个配置real count了,其实就是一个二元组对吧?哎,我最后只要拿到那个对应的K和value啊,一个URL一个count值就够用了S,所以这里边我可以把这个定义出来,Or page view counts,哎,其实还是差不多的一个定义啊,List buffer,然后里边的类型我定一个二元长型,然后这里边同样还是先给一个空的类4BUFFER,那后边这里边呢,就还是添加对吧?这里边要添加的注意是一个二元组,所以我要括起来啊,里边这个添加的元素外面括号表示这个方法调用,大家知道这个加等于本来这是一个方法吗?对吧?外面括号表示方法调用,里边的括号表示二元组,那当前我们要的是什么呢?Entry点哎,大家看到可以get t对吧,然后entry.get value把这两。
08:26
把它拿到传进来就完事了啊啊,那当然这里边还有一个这个提前清空状态,这里边大家要注意一下啊,诶这里边其实有问题的,就之前我们这个是没给大家详细测,所以大家可能没发现,就是这里边的清空状态会有问题,哎,为什么清空状态会有问题呢?你像之前我们做这个操作的时候,是只要排这么排序,排一次我当前窗口里边所有的这个呃,数据就都用完了,所以我就直接把它都清空就完了,对吧?但现在呢,我第一次窗口做了这个出发啊,我到点之后,然后一毫秒之后出发那个操作,把这个该输出的都输出了啊,然后排排好序了,但是之后呢,又来了迟到数据,他会在我之前的基础上,是不是有可能更改我当前的count值,也有可能更改我当时那个输出的那个排序的那个排名情况,对吧?哎,那所以我们这里边想要的是什么呢?呃,大家可能想到之前我们那个,呃更。
09:26
更新的时候你看到是当前的count值改变了,多了一个加一对吧,那但是后边的那个排名那张表里边其实它会有问题,之前我们是直接把这个例子的都清空了,那大家想是不是当前如果要是之前是清空的状态的话,现在你新来的数据有哪些数据呢?是不是就只有当前有迟到数据的更新了,之后的那些URL的count值才有啊,啊对吧,比方说之前啊,我一个URLURL1它的值是十,这个排第一,然后之后呢?呃,是这个URL2COUNT值七个排第二,那之后假如说我这个允许处理迟到数据,后面又来了一个URL2的一个访问,那大家会想到前边我那个聚合结果的输出是不是只输出了一个URL2这样的一个。
10:23
这样一个呃的一个结果啊,对吧,当然里面还有那个信息啊,还是同样一个window里面,这里面只输出了这个结果,那后边我们收集它,然后再去做排序的时候,是不是之前的这个七和这个十都已经被清掉了呀,是不是有这种情况出现,哎,那如果要是这种情况的话,我们这里边排序你最后只有他一个,那肯定他排第一啊,我们就根本没有起到更新这张表单的作用,只是更新了这一个count值而已。那在这种情况下怎么样去做处理呢?哎,当然大家会想到这里就不要去做状态的清空,对吧。
11:02
我们这个。就是在做这个操作的时候,大家就需要注意不要提前做状态的清空,而应该怎么样呢?哎,那有同学肯定就想到了,那我到底应该在什么时候才去清空状态呢?真正我们当前这个map不再去保持的时候,应该是什么时候呢?那就应该是等到一分钟之后,我们不是定义了那个一分钟的处理允许处理迟到数据的时间吗?等到一分钟之后,真的所有的数据都已经处理完了,窗口也关闭了,对吧?窗口也都已经清空了,这个时候是不是后边我们做process?方式这个操作的时候,这里面的这个表单状态也就可以清空了,因为再也不会有数据来了嘛,对吧,前面我们那个窗口也再也不会有数据更新了,所以这个时候再去清空才是比较合理的,所以基于这种考量的话,我们在前面呢,不光要注册一个,就是window and加一的一个定时器,还应该注册一个,哎大家想想是不是这里边我定义一下啊,另外注册注册一个定时器,哎,什么时候出发呢?一分钟之后触发,这时窗口已经彻底关闭对吧,彻底关闭不再有,呃,聚合结果输出,那这个时候就可以清空状态了,对吧?清空状态,哎,所以这里边。
12:44
我们定义一个,大家看这就是多个定时器啊,呃,Timer service,然后同样还是event time,那我现在要的是value.window and,再加上一分钟,一分钟那是6万毫秒对吧?啊,60秒,6万毫秒我们在这里把它定义出来啊,那接下来我们怎么样去判断这个东西呢?那这里只有一个on time方法,我到底怎么判断是当前是这个我定义的要去输出结果啊,就是做排序输出的这个定时器发了,还是说60秒之后,一分钟之后要去清空状态的这个定时器出发了呢?哎,这我们就要判断定时器了,根据什么判断定时器,我们之前说过,定时器的唯一区别就是时间戳对吧?哎,那这里边不是有time step吗?你根据time stepmp来判断它,如果是温度加6万的话,那就说明已经到了这个清空的时间了,对吧?哎,那这里面有一个问题是,你在这里面并没有这个。
13:44
里啊,我怎么知道这个是什么呢?我这里只能拿到这里的呀,哎,所以这里边我们还要大家看到这里面有一个操作啊,就是判断判断定时器触发时间,如果已经是一分钟之后,那就是窗口结束时间。
14:13
一分钟之后,哎,那么那么清空状态,所以这里面我们的判断直接就是if time stamp,就是当前这个定时期出发的时间戳,对吧,Time stamp,如果它等于窗口的那个按时间加上60秒的话,对吧,6万毫秒的话,哎,那但是大家想我这里面没有,呃,简单的一个想法是,我当时如果要是能保存下对吧,把当时那个保存在一个状态里面的话,我现在直接取那个状态就可以了,但是我没保存,没保存能不能拿出来呢?其实也可以,因为我们现在还有ctx有上下文。上下文里边它有一个东西是可以获取当前的current key,对吧?那当前的current key是什么呢?我们当前的这个key process function,它就是按照window and来做的分组,大家还记得吧?哎,就是按照window and做的分组,所以我这里边只要上下文里面能拿到当前的这个K,那其实就是拿到了window one长整性对吧?哎,所以直接加上6万,如果说现在它相等的话,那直接清空这个page will count map,直接clear,对吧,把它清空。另外这里面如果完全清空了,接下来是不是就什么都不要做操作了呀?
15:36
对吧,所以你看我为什么把这个判断放在最上面呢?就是假如说符合这个条件的话,下面啥都不要做了,我直接return对吧,直接当前这个操作直接返回退回就行了啊那那如果说这里边没有等于当前这个这个时间就是加一分钟之后的时间的话,那说明我们当前的这个出发,呃,应该是这个对吧?Window and加一的这个触发,或者是什么呢?呃,就是肯定是window and加一的触发啊,那或者就是后边来了那个迟到数据之后,哎,又接下来又触发一次,对吧?呃,有这样反复触发的一个状态,那接下来我们就执行后面这个流程就可以了。
16:14
啊,定义这个,把这个map里边的数据都拿出来,然后都放到一个列表里,后边做这个判断的时候呢,这也要稍微有一点区别,对吧,我们当前做判断的时候,那这里边就不是点count了,而是点下划线二对吧,就第二个元元素啊,那个count值做一个判断,然后后边这个操作其实也都一样,就是你不要用URL了,这个就只有下划线一和下划线二了,对吧。直接把这个拿出来就可以了,然后下来我们可以运行一下,大家测试一下,看效果怎么样啊。我们还是做一个分屏显示。好,我们把这个代码提起来啊,现在同时也起一个NC准备去输入啊,流失的输入数据啊,那我们当前输入的数据还是跟上一次类似的啊呃,这这一波的话,大家就不用去考虑这个当前,呃,就是顺序执行的这个过程了,对吧,这个肯定就是还是类似的这个过程了。
17:20
我们就主要来这个后边的数据,导致我们这个数据做更新的时候,这个状态啊是没有复上好啊这是两条数据了,然后接下来来一个这个四六的数据,这是一条乱序数据,但其实现在我并没有窗口关闭对吧?没有任何的窗口关闭,所以说这里边你直接输入是没有任何问题的,我还是推进一下这个时间戳五幺对吧,到这个时间点的时候,大家看到有一个聚合结果输出了,对吧?诶,当前这个就是29550这个窗口啊,这个以以这个时间五零作为结束时间的这个窗口,现在得到了一个统计结果,有两个当前这个present presentation了啊,然后接下来我们还可以继续给这个,呃,那这里边我可以比方说啊,我这个给一个不同的URL,比方说我给一个,呃,我我先给这个还是同样的URL。
18:20
啊,我们先看一下效果,又输入一条数据之后,诶,大家看到这条数据呢,是一个31秒的数据,所以50秒关闭的这个窗口,它还可以继续更新对吧?诶这里边就是尽管当前这个已经关闭,那就是已经到点了,输出了一次了,但是现在呢,等待一分钟时间内,它还可以继续更新,更新成三,然后下边呢,大家看到这个还有就是呃,就是像这个35秒啊,四十五四十秒啊,45秒啊,这些时段的窗口都可以去追加,相当于更新一下这个数据,这是完全可以的啊那另外呢,我们接下来就是再去给一个这个五二啊,更新一下当前的,那大家会看到前面我们这个数据就都有了一个输出,对吧?啊,这里面你看输出的就都只是一个啊,那当然有同学可能说,诶,你前面这个更新也只更新了一次啊,你假如说我们前面输的时候是其实中间给了好几个。
19:20
个迟到的数据对吧?哎,那如果给好几个迟到数据的时候,是不是就会出现这种情况,呃,就就会导致多个输入呢?我们再给一个31,那大家看到这里边这个变成了四了,对吧?呃,我这里再给一个31,你看这里变成了五对吧?呃,这个变成了五,这个变成了三,然后现在我们连续又来了两条迟到数据,现在再推进一下当前的这个WATER53,大家来看一看效果,大家看不会有这个别的这种情况出现,对吧?诶就不会出现刷屏的情况了,只有当前它一个,因为你是拿map这个去定义的嘛,所以里边当然只有一个值了,不可能出现别的啊,那另外还有一个问题就是说,诶这里边有一个,假如说我出现了别的一些,呃,比方说这里边还是啊,我来一个51的时候的数据吧,给一个别的urll。
20:14
好,当然51的话,不在我们当前这个,呃,50以前的这个窗口内,对吧,我们就把来一个49的数据,这个叫做present,不是present啊,那大家看到现在的话,50这个窗口里边的数据是不是又多加了一个一啊,哎,本身50这个窗口这个presentations热门度是五对吧?哎,这个时候现在它又啪追加了一条,那像之前如果你要是我们每来一次都把那个状态都清空的话,那大家知道接下来你如果再统计,是不是就只有它没有我们之前的那个PRESENTATION5了啊,那接下来我们再看一下啊,假如说我要想看到结果,那得推进当前的这个时间戳,所以我来一个五四。
21:01
好,大家看到现在是不是就是所有的当前所有的这个URL,它的统计值更新之后的统计值都在我们这个表单内啊,啊,这就完整的实现了我们当前的这个需求,既要保证我们可以处理这个迟到数据,可以在之前聚合的基础上去做进一步的叠加更新,另外呢,还要保证我们最后输出的这个就是top n的榜单,排行榜的这个信息是完全正确的,不能丢,也不能有重复的URL出现刷屏的情况啊,这样的话我们就解决了之前的这个bug。
我来说两句