00:00
那前面我们我们发现了,就是在这个处理的过程当中,最后的效果其实是有问题的,那我们总结一下,大家大家看一下啊,首先是这里边我们因为这里边列表状态的清空有一个clear操作,所以导致后边如果我们更新这个榜单的话,里边是不是就只有当前一个数据了,因为之前都被清掉了嘛,所以就只有当前最新的这个能够输出了,那另外可能大家有一个问题就是那为什么最新的这个数据还能够在之前的基础上叠加呢?为什么它不是一呢?为什么是七呢?啊大家要注意啊,这里边我们这里边输出的啊,保存的这个状态。这里面我们保存的是那个page view count,对吧?Page view count是不是这是当前,我们就是之前的那个窗口聚合得到什么结果,这里面就保存什么结果啊,我我后边去做这个清空clear操作的时候,清空的是kid process function里边自定义的那个列表状态,我能清空之前窗口的状态吗?
01:02
这肯定不能对吧,这已经是前后不同的任务了嘛,所以前面的窗口状态,他是不是只要允许一分钟处理这个迟到数据的话,那是不是就相当于不会清空啊,所以之前大家就会发现之前我的那个浏览量是六,那是不是当前这个窗口里边它的那个统计值还是六啊,还在那放着呢,只不过是输出了一次,当前这个presentations是六,Count是六,所以后边输出到我们这个key present function里边来。后边去做处理的时候,就是在这里把它,呃,就是at定这个list state,然后注册了一个定时器,对吧,后边我们处理的时候是不是定时到到时间的时候把它拿出来排序的时候,看到浏览量它就是六,后边我们是不是就把它清掉了。清掉了之后,其实窗口里边还是六对吧,那接下来来了一个十道数据,你说窗口里边去做叠加,是不是还可以叠加,叠加出去一个一个七啊,所以窗口是不是就又会输出一个page count叫做就是当前presentations,然后count是七啊。
02:08
所以掉到这儿来之后,是不是又艾特进来了,只不过当前这个例子是不是空的?Cool list,然后ADD了一个PRESENTS7ADD了一个这个数据对吧,所以最后输出是不是就只有它这么一条啊,这个大家就比较好理解了,那我们想到这个应该怎么去改进呢?因为你看到如果说这里边有更新的话,之前我们是不是就不应该这么简单粗暴粗暴的把它直接全部清掉啊。而是应该怎么样对大家想到,而是应该就是我们前面其实应该做一个类似于判断,假如说当前的数据在里边已经有的话,那是不是不是要直接ADD进去,而是直接更新之前的那个那个对应的那个count值是不是就完事了,那这大家就想到我这是一个list list如果要做这个判断的话,是不是还有点麻烦啊,我得直接那个循环便利啊,判断当前的每一个那个URL到底是什么,大家能想到我用什么一什么样的一个数据结构就更容易实现这个功能嘛,对,自然我们就想到了,我是不是用一个map方法就更容易实现这个功能啊,所以接下来就是用我用一个这个map state就可以更好的实现这个功能了,所以接下来我们的操作就是要把这个k process function做一个改良。
03:26
这个改良的方式非常简单,就是不要把它保存在一个例子里边,而是保存在一个map里边,所以接下来我把这个注掉。下边我要定义的是一个map。啊,那map states的话,大家知道它里面保存的是k value,那当前我到底保存什么呢?其实就是一个URL,一个count值是不是就完了,那甚至之前我们都想到过,就这里边,其实我不需要把完整的数据都保存进来,因为里边的那个window是不是没用啊,对吧,我其实想要的就是UI,就是那个key和对应的,呃,就是当前那个URL和对应的那个count值嘛,所以我就干脆把它保存成这样的一个KPI,对就完了。
04:11
当前的key就是类型是string,当前的value类型是long长征形,后边我把它定义成一个page啊,这个还是定义成welcome welcome不要叫list state了,我把它叫做map state,对吧,重新定义一下,那后边这里也要注掉。接下来的定义过程跟上面其实是差不多的,对吧,等于get runtime contacts,现在是不是要get map state,接下来new一个map state script,大家看这里边类型是两个string long,对吧?后边我给一个当前,这个叫page count map。然后另外这里边的类型怎么给呢?大家还记得我们那个map的话,是不是后面应该给两个类型啊,一个是当前key的类型,一个是当前这个value的类型,所以是string.class另外还有一个长整型long.class这样定义是不是就没问题了?好,那对应的下边我们都要去做更改当前我这个数啊,如果要是来了之后,那就不是直接去ADD了,而是。
05:24
当前应该是page count map去直接put对不对,Keep value嘛,所以接下来这个put的过程其实也是直接拿到value点,以什么作为key呢?URL对吧?然后什么作为值呢?哎,当前的count对吧?直接把这个put进去就完了,后边还是注册一个以window and为标志,注册一个定时器,哎,那这里边问题就来了,大家会想到后边我去做操作的时候,这个比较简单啊,就这里边首先啊,接下来我这里边要拿的时候。我把这个拿出来,当前是这个配置count map,然后get,大家注意这个get出来拿到的就是直接我调get的方法的话,其实不对,这个直接拿到的应该是个啥呀。
06:14
这个get方法其实是要传一个K进去,Get一个值出来,对吧?诶它不是list了啊,大家回忆一下这个,如果说我要把当前这个map里边的所有键值对都拿到的话,那应该那应该传一个什么进来,是不是应该传那个entrance进来啊,对吧?啊,那当然这里边。我这个啊,把这个直接改一下啊。我可以直接把它叫做。啊,这个其实我还是叫page view counts吧,对吧,Page view counts,只不过它大家知道这是一个那个entries的那个表达就可以了。K。然后接下来大家就知道了,那我下边的这个comp competitor是不是也要做一个改变啊,这里边我要我直接把这删掉吧啊。
07:07
里边需要去实现的这个东西需要怎么写呢?是不是里边应该是一个entry类型啊,map.entry类型,然后接下来这里边如果我要去做判断的话,那就应该是if啊,这个还是一样的啊,OE点是不是我应该是get value啊对吧?取当前的值出来,如果它大于o2.ygetvalue的话,那么就return负一对吧,如else if,如果O1.getvalue小于o2.getvalue的话,那么就return正一,最后else return0。这是不是就是我要做的这个操作啊,对吧,我转成这样的之后就就要用这种方式了啊,Get他的这个key和value,然后接下来这个最后做输出的时候,这个也很简单,那就直接用当前的这个page view counts,这里边是一个一个的那个K,对,这是en对吧?哎,那这里边最后得到这个结果也不应该是page view count,我我我把这个copy一下啊。
08:14
我们还是直接让他自动推断出来就完了,要不然这个还麻烦一点啊。大家看我直接拿到一个这样的entry,然后接下来那其实就是把这个当前的里边什么拿到,这是不是key拿到,然后value拿到就完事了,就这样对吧?哎,这就是我们整个这个代码更改的过程,当然大家会想到了,之前我们这个要给一个这个clear,主要就是因为最后还要清空状态,对吧,你不能让我们这里边的这个状态一直保持着呀,那大家想一下,现在如果我们想清空状态的话,怎么清空啊。大家想我现在应该是什么时候,当前这个就不再更新了。诶,那大家想这是不是跟我们那个窗口应该要一致啊,窗口这里什么时候就不再输出结果了呢?更新结果了呢?是不是一分钟之后啊,他等到一分钟之后,是不是当前的这个窗口就真的关闭,然后清空状态,再来的数据就只能扔到特殊物流里边了,当前的这个窗口聚合就不会再去更新了,对吧?那所以我们当前的这个是不是也就应该是在一分钟之后再去清空这个所有的状态啊,大家想这个一分钟之后清空状态这个操作怎么做呢?哎,大家想这是不是我就再去另外注册一个定时器啊。
09:35
有这样的一个想法啊,就是我在这儿注册定,哎,不是在这儿啊,这是我们的open生命周期,在process element这个方法里边,是不是我可以另外注册一个定时器,对吧?Register time timer,我要用的是value.get window and,拿到这个时间,然后再加上一分钟之后,那当然这个是时间戳嘛,所以是60 60秒乘以1000,这是不是毫秒数啊?所以大家注意啊,这里边是注册一个一分钟之后的定时器,用来清空状态。
10:18
那对应的后面这个on timer触发的时候,大家会想到,那我这个怎么判断呢?我怎么知道你到底是这个就是窗口结束时间加一毫秒,我要排序输出了呢,还是说我现在一分钟之后了,要去清空状态了呢?是不是用这个,我们说定时器是不是就是按它的时间戳来区分的,所以我是不是按照这个时间戳去做一个判断就知道了啊,所以接下来我们这里边首先要做的操作,其实是先判断是否到了。呃,窗口关闭清理时间对吧?啊,那么如果是。直接。
11:01
直接清空状态返回,大家想如果要是到了这个一分钟之后,是不是下面这些操作都不要做了呀?对吧,你当前如果这个出发的这个时间点是一分钟之后的那个时间点的话,这也不是因为我们那个,呃,数据来了之后又出,又注册一个定时器,等着要去重新更新那个榜单,对吧?那所以后面就直接就return啊,后面啥都啥都不要做了,所以我就直接做一个判断if。Time stamp当前的时间戳等于,等于什么呢?一分钟之后这个怎么判断,那我是不是还是得拿到当前的那个window and呀,那当前的window and从哪里拿呢?现在已经没有value了,哎,但是有ctx对不对,有上下文呀,上下文里边我get current key,这是不是就是当前的那个window and呀,啊,所以我直接用它加上60乘以1000,这是不是就是当前的那个值,这里大家还要注意一下get current key get出来的类型本身是不是就是当前key的类型啊,因为我现在前面我们定义的时候用的是这个方法,引用的这种定义是不是当前K就是一个长整形,所以我这儿直接加没毛病对吧?诶你如果要是本身是ta类型的话,是不是还得把它那个再再获取出来啊,对吧,就是把对应的那个刚才那个长整形只要拿出来才行啊,所以我就直接这么判断就完事了,这里边我做的操作就是直接把这个状态清空,另外。
12:34
是不是我可以直接return了,对吧,直接return就完事了,啥都不要干了,那如果不是到这个关窗,真正一分钟之后的这个清理时间的话,我再去把下边的这个步骤做一个。就是说明如果不是清清空时间的话,说明这个定时器是来了数据之后注册的,对吧?啊,所以接下来我是要做具体的这个排序操作的啊,那接下来就该怎么做怎么做就可以了。这就是一个完整的改进的过程,接下来我们把这个代码执行一下。
13:05
来看一看。测试的效果怎么样啊,是不是可以达到我们前面想要的目目的。好,当前这个哦,我把前面这个要关掉啊。接下来我们这个又运行起来,那么在这个代码里边还是啊,我把这个做一个分屏显示。给大家看的清楚一点啊。呃,啊,或者这个我们数据都在里边,我就直接把这个放在这就可以了,接下来一条一条输入啊,还是这个流程,具体再看一眼二幺,呃,这个49秒,25分49秒第一条数据没有任何问题,50秒第二条数据没有窗口输出,还是只有一个数据的输出。到51秒的时候,因为w mark1秒延迟,现在是不是有窗口结果的输出了,对吧?啊,这个AJJ已经有输出了啊,然后52秒的时候,当前就会有water马继续推移,是不是有那个榜单输出了,对吧?这里边浏览量一啊,这个没有问题,然后接下来是下一个窗口是55秒的时候,到了它的结束时间,那这个数据来的时候,当然它还没有water mark没到,所以说不输出56秒数据来的时候,现在watermark到55秒了,输出了一个统计结果,对吧?呃,55秒结束了,这个窗口现在有四个值了,呃,然后接下来我们继续给这个另外的一条数据啊,Present,另外一个K不同的URL。
14:37
然后呢,我们把这个watermark也更新一下,得到55秒窗口对应的那个榜单。大家看接下来我们看到55秒只有这个presentations对吧,只有这个四啊,因为我们刚刚输入的这个是属于60秒了嘛,60秒的那个数据啊,接下来直接推进water,把这个60秒的窗口也做一个输出,首先零一的话。
15:00
这里边是不是有两个AJ啊六二这个都没问题,然后接下来再推进一下这个WATER02的时候就可以看,诶no,好吧,输出这么多结果我们输输输错一条数据,这里边直接给直接给重新来了,对吧。我们重新看一下啊,接下来的话,大家大概知道这个是怎么回事就可以了,比方说我这里边首先还是先给一个49,哎,那我这个就不重复去输了啊,我直接给一个52。我可以把这条数据多输入几条,大家看这里边已经有一个一了,对吧,把这条数据多输入几条,那接下来如果我们当前55的这个当前的这个窗口,如果要想关的话,给一个56啊。大家就看到它的这个count是四,这个没毛病对吧?啊,这个已经有了啊,然后接下来我们给一个present的这个数据。
16:00
还是56。然后再来一个这个。57。大家看到前面我们输出了这个55秒的这个数据啊,这个没问题,就只有这个presentations,然后接下来如果我们再来一个。26分零一秒的数据的话,这个时候会把我们60秒关的这个窗口啊,就结束的这个窗口统计的这个值都输出五和二,那接下来如果说我们再把这个watermark再更新往前推进的话,是不是就会输出榜单了?这里二十六零二对吧,大家看到这个五二没没毛病对吧,那关键现在就在于,如果我们再来一条迟到数据。46秒的数据会怎么样呢?首先之前它属于的窗口,本来已经结束的窗口,对吧,但是因为我们允许一分钟处理迟到数据,所以它还是要更新一下,之前这个50秒变成了二,55秒是五,56秒变成了六。
17:01
然后接下来我们关键是要把榜单更新,榜单更新,那是不是需要把这个沃马继续退役啊,啊,这就是我们说的啊。如果不更新这个water mark的话,后边相当于我们的任务就时间没有进展,他就不会去触发这个定时操作,大家看一下当前的榜单,50秒只有一个对吧?诶26分整,也就是60秒的窗口,是不是相当于都保存下来了?六和二,而且都是我们更新之后的那个状态对吧?好,那对应的我也可以比方说呃,当前这个present不是也有对应的这个数据吗?我再来一条这个56秒的这个数据。大家看这个present来了之后,他自己是不是就更新成三了,然后接下来如果我们想看到这个榜单里边的更新的话,那是不是还应该要把这个对应的时间戳了再更新一下呀。我这里边把这个更新成零四,直接来一个数据。
18:03
大家看当前是不是就变成了六三啊?没问题对吧,哎,之前的这个presentations还是六,这个present就变成了三,那假如说我这里边啊,来一条数据,比方说我们是这个,呃呃,就是比方说啊,我们回退到这个25分57秒。但是呢,我这里边给一个其他不同的另外的一个URL,比方说我就叫这个对吧,把这个定义出来,给一个这个迟到数据的话,大家看是不是它只有一个呀,那这个他能直接追加到我们之前的这个榜单里面去吗?可以的,因为我们这个窗口没关啊,对吧,窗口里面状态是不是都有啊,所以你现在来了新的数据,是不是我也可以单独去统计一下呀,这没问题对吧?所以接下来我可以把这个再改成看要看到榜单得推进water mark,那现在应该轮到二十六零五了,对吧,我再把这个更新一下。
19:07
大家看到当前是不是榜单就变成了631啊,哎,这就是我们这个处理迟到数据的一个过程啊,呃,这个就是大家可以下来之后好好的把这个东西再具体的再测一测,呃,这里边还涉及到另外一个问题,就是说那可能大家会有思考,就是说什么样的数据才会丢到这个测试输入流里面去呢?就是我当前这个已经所有的窗口都关了,对吧,这个数据已经收不进去了,哎,那就会丢到这个特殊流由那大家要注意就是说比方说。我当前啊,最早的那个窗口是10:25:50,那大家说他什么时候就这个窗口真的彻底的就关掉了,是不是一分钟之后,就是10:26:50的时候,这个窗口就彻底关掉了,对吧?诶那比方说我直接把这个时间先往前推移一下啊,我这里边可以直接给一个10:26:51,大家想我给一个这个water mark的话,大家想现在我给一个51,如果我给一个50的话,这个时间是不是还没进展到那个时候啊,对吧?但是大家会想到我给一个50会有什么效果。
20:21
这是不是相当于automark有一个跳变,中间是不是很多窗口都会关啊,哎,所以这里边我可以直接给一个啊,大家看到这里边是不是输出了很多这个窗口聚合的结果啊,对吧,这都是这个统计我们当前的这个值啊,然后这个pre,大家看这个pre,因为输入了好几条嘛,榜单里面也有它的,也有它的这个,呃,一个位置啊,643这样的一个统计啊,那么接下来如果说大家想一下,如果说我接下来给一个数据的话,那我们要测一下啊,什么样的数据就完全收不进来了呢?这个这个大家要注意一下,不是说就假如说我再给一个比方说10:25:46的数据,这这个就呃,就是当前这个窗口就就已经收不进来了,对吧?呃,大家会想到现在这个26分50秒的话,这会儿还没完,就相当于我是要把这个更新到51秒,这个时候才才有效,对吧。
21:17
我得把这个water再推进一下,然后这里边的话,大家看又多了很多这个AJ输出,这是因为哎,这是不是就是最新的这个窗口,26分50秒要关的窗口,这个相当于要输出一个结果了啊,对吧,这个大家是能想到的啊,那假如说这个时候我再给一个十点。呃,我我给一个我们那个数据啊,10:25:46的数据的话,大家想一下,它会直接输出到测输流吗?会不会呢?他看不会他直接更新窗口了,为什么会直接更新窗口呢。因为大家想它属于的窗口只有这个10:25:50要要关的这个窗口吗。
22:02
不是对吧,它是不是它属于很多个窗口对吧?它同时你看10:25:55要结束的那个窗口,它是不是也属于啊,10:26结束的窗口是不是它也属于啊,那那些窗口是不是都没关啊,所以大家要注意啊,只要它还有属于的窗口没没真正的关掉,没真正的清空,那这条数据它就不会被丢到特殊流里面去,因为我们认为相当于你这个数据还能处理对吧,还能够叠加吗?啊,那什么样的数据就真的会被丢呢?那就是对一个窗口都不属于,就是它属于的所有窗口现在都已经过了一分钟,延迟时间都已经关了,那这样的数据就真的会被丢了。那所以现在我们要找一个什么样的数据呢?那就是它属于的最后一个窗口是10:25:50。大家想想是不是这样,现在因为我们watermark已经到了10:26:50了,那10:25:25分50秒的那个窗口是不是彻底关了,那如果一个数据它属于很多个窗口,最后一个是10:25:50的话。
23:18
诶,那这个数据就真的要被丢了。那所以大家想一下,什么样的数据满足这样的条件呢?我们是十分钟一个窗口,对吧,这是从15分50秒到这个。到这个10:25:50,所以那大家自然就会想到,那这个数据它是属于10:25:50这个窗口,它是不是就不应该属于10:25。55秒这个窗口啊。大家想是不是它不能属于接下来这个55秒,接下来要结束的这个窗口,但是它应该属于25分,是不是45秒的这个窗口啊,对吧,它应该属于这个窗口。
24:01
诶,那大家想一下什么样的数据满足这个条件。啊啊,那有同学说,诶,那10:25:46满足,呃,但之前我们说了后面这个窗口,这可是10:15:50开始的,对不对,我呃那个55秒开始的,你那个10:25:46肯定还是属于下面这个窗口的,对吧?所以如果要是不属于下面这个窗口的话,是不是我要刚好让他的那个起始,就是当前的那个时间要部署就是在这个10:15:50~10:15:55之间啊,在他俩之间的数据是不是就相当于只属于上面这个窗口,不属于下面这个窗口啊。所以我可以给一个十点。大家想一下15分,比方说51秒的数据。它属于下面这个窗口吗?是不是已经不属于了,收不进来了,对吧?它属于上面这个窗口吗?属于对吧?诶,所以上面这个窗口是不是就是这个数据对应的最后的一个窗口啊,再往前它还属于,再往后就不属于了,所以接下来我们给一个数据是10:15:51。
25:21
比方说这个presentations对吧,给这输入一下,大家看这个是不是直接就输出到这个late里了啊,这就是这个最终迟到数据的一个特殊物流输出啊,大家如果要处理这个数据的话,那是不是就得单独的把它拿到另外的再做一个单独处理啊,啊和之前那个聚合的结果做一个合并啊,那有可能我们做一个批处理,离线批处理去搞定这件事,这就是完整的处理迟到数据的流程。
我来说两句