00:00
这个代码的基本功能我们已经实现了,但是大家想一想,这个过程里边有什么问题没有呢?这整个的实验过程基本上跟前面的那个,呃,实时的热门商品统计基本上就是一个翻版差不多,整体流程都差不多,但其实大家发现了,这里边我们最大的一个问题其实是在于当前的数据是不是完全不一样啊,而且这里边我们的数据是一个乱序数据,我们前面的处理方式是什么呢?非常简单粗暴,是不是直接给了一个一分钟的water延迟啊,那大想一下,在实际处理的过程当中,直接给一个一分钟的water mark延迟,这个合理吗?这完全不合理对吧?哎,你直接上来之后就延迟一分钟的话,那其实大家想一下,特别是想一下我当前的这个场景啊。我们输出的频率是不是五秒钟就输出一次呀,结果你输出的是一分钟之前的结果,那这个延迟就有点儿太高了,对不对?对于我们当前这个场景来讲,这完全不可接受,那大家想一下,我可以怎么样改进这个代码呢?
01:05
啊,大家会想到,那我是不是可以利用我们之前讲过的window里边,就是我们讲这个处理延迟数据是不是可以有三重保证啊,可以在最外边直接来一个这个watermark全局的延迟,但是这个延迟我们说实际项目当中是不是一般是要小一点啊。因为大多数情况下,真实的这个因为乱序导致的,就是因为分布式架构,分布式传输导致的,这个延迟其实不会太大,对吧,一般可能就是几十毫秒啊,最多上百毫秒,那所以这里边我们一般可能就可以给一个比较小的延迟就够了,所以比方说我这里边,假如说这个我没有去统计过那个数据里边啊,我们当年的测试数据可能不止啊,那个延迟都比较高,那比方说我认为大多数的数据都能够在一秒之内就已经把他这个延迟给搞定,那我是不是在这里直接给一个一秒就可以了。
02:02
Watermark,先延迟一秒就可以了,但是大家想到假如说我这个一秒还是有些数据hold不住呢,那后面我统计这个窗口数据的时候不就乱了吗?对吧,有些数据就丢了嘛,刚才那个count值就少了,那怎么办呢?对后边是不是窗口这里面有后边的第二重第三重保证啊,首先窗口里边是不是可以给一个允许迟到处理迟到数据啊。大家还记得这个吧,Aloud lateness,这里我是不是直接就可以给一个更长一点的,比方说我给一个一分钟的延迟时间啊,所以我直接给一个time.MINUTES1。这样的话,我得到结果是不是就应该本身water mark是延迟一秒钟,那就相当于一秒,只要是等待一秒钟,很快的就会输出一个近似正确的结果,然后呢,接下来是不是在这个输出的过程当中,应该是一分钟之内,我还可以继续等待来一个数据,就把之前的窗口要更新一次啊,对吧,就不停的更新之前的结果啊,这是这样的一个效果,然后接下来如果说还有这个漏网之鱼的话,那怎么办呢?
03:15
诶,那当然就是,是不是还有测输出流啊。这个。当前大家看到这个,我可以去允许这个lo listen啊,允许这个迟到数据,然后还可以直接把它输出到side output。呃,这里边会有一个问题啊,就是大家看这个。这里少了括号,对这里边定义的这个没有定义完整啊,接下来我就可以直接把漏网之鱼直接输出到这个特殊这里面去,当然这里边我需要一个auto tag对吧?所以这里我可以把这个auto tag直接定义出来。呃,我们这里定义一个侧输出流标签,你有一个output t output tag。
04:09
里边的这个数据类型应该是什么什么样子呢?窗口里边的特殊物流是不是必须跟原始的这个窗口里边的数据类型是一样的呀,所以我们当前本身就是那个阿帕奇log event类型,现在是不是还必须是这个呀,输入的数据类型对吧,里边给一个那个名称,比方说当前我叫做。把它定义成一个啊late t,这是我们前面的一个定义,然后接下来是不是这只要传递一个这个tag就可以了,类tag放在这就完事了啊,那大家知道接下来我怎么样把这个拿到呢?对应的这个数据怎么样拿到呢?那是不是要基于当前的这个window h stream?啊,大家看我前面就没有把这个写成data stream对吧,因为是不是data stream没有对应的那个get特殊流的那个方法呀,必须得是simple output stream operator才可以有那个方法啊,那接下来我要获取到它的话,其实是应该在这里啊,我可以在前面把这个对应的window a j stream也做一个打印,这个叫AJ。
05:21
然后另外基于它是不是可以get side output,只要传进这个t tag来就可以了,哎,做一个打印,这个就叫late对吧,这就是我们当前啊能够拿到的这个所有的数据,如果想看到他的信息的话,也可以另外放在测试物流打印出来。那当然了,这是最后的结果对吧?啊,前面我们也可以每来一条数据之后啊,这个data stream我也可以直接方便做测试嘛,直接做一个打印输出,看到当前的这个data一个一个来。那接下来我们如果要是想测一下这个过程的话,肯定就不要直接读取文件,直接一下子都读进来了,对吧?哎,我们这里面更好的方式当然是。
06:07
当然是从流逝数据里边去读取,从卡夫卡里面去读取对吧?诶,那这里边我不用卡夫卡测了啊,简单一点,我直接从so文本流去做一个测试吧,Env socket,大家还记得这个吧,Socket text file对吧?这里边local host 7777,随便定义一个端口,把它起起来好,那所以接下来我就可以去做一个测试了,Bash nc启动一下7777。好,我把这个分屏显示在这里。然后接下来把这个代码启动。好,那同样的,我们还是拿一些这个测试数据啊,给大家来做一个说明,我这里边直接用这个。Network flow test里面的一些数据。
07:05
好,我把这个放在这里啊。接下来我们把这个屏幕分配好。诶,当前这个报错了,我们看一眼应该是少了东西是吧,我们看一眼啊。哦,这里面大家看到就是这个output tag,我们少了那个画括号对不对,因为当前这个。在在找一下我们那个output tag的定义啊,是在这里对吧,这里边大家知道,如果说呃,就是你看我这里边如果要是说直接给这样的一个定义的话,它其实是推断不出对应的这个类型来的,对吧?所以说我们要求必须是写成匿名内部类的这种方式给一个划括号,这样的话,它可以推断出对应的类型,好,那接下来我们把这个做一个。运行。执行起来。
08:05
好,还是放在这个下边啊,大家能看得清楚就可以了,然后接下来我们就一行一行去输入这个数据,首先大家看啊,前面这条数据是这个IP啊,然后user这个也没有,这个我们都不管了,我们最关心的其实是URL和对应的这个时间,对吧,大家看当前时间是10:25:49 presentations啊,这里面来了一个一条数据,好,我把这个数据做一个输入。大家看自然这里边就有了一个data的一个输出,对吧?啊,就是我们这里边是把它这个完整包装好这个输出的啊,有一条data,当然了,现在没有任何的窗口结果输出,大家知道这个当前窗口应该是在什么时候才会输出结果啊。我们定义的那个窗口,滑窗是十分钟的窗口长度,然后每隔五秒就要滑动一次,那最后输出的频率是不是五秒钟一个,五秒钟一个啊,而且大家知道对,当前应该是五的整数倍,对吧?所以那大家想49它属于的窗口最先关闭的应该是哪个,到什么时间点就应该关一个窗口啊,50是不是就应该关一个窗口啊?哎,所以接下来我们输一个50进来。
09:16
50的数据进来啊,这还是presentations。大家看这里边又多了一个data对吧?呃,这里边是50,大家看这个50秒的这个时间说就放在这儿了啊啊它它是一个21950000的话,这是因为毫秒数嘛啊,这个看的很明显啊,但是这里边没有关关窗,为什么没有关窗呢?对walmark是不是有一一秒的延迟啊,所以这里边我们必须要有五幺的这个数据来了之后,是不是才有可能会关掉之前窗口啊,所以来一个五幺的数据。同样又多了一个data,接下来大家看是不是多了一个AJ啊,多了一个前面我们窗口聚合的结果,这里边URL presentations,然后当前的这个窗口结束时间是五零,然后count是一,为什么是一?
10:05
诶前闭后开是不是5051都不属于啊,因为它50就关了嘛,50这个是不包含在里边的啊,所以这里边只有四十九一条数据,诶那这里边问一下大家,为什么我们这里边只得到了这个聚合结果,没有对应的那个排序输出的那个榜单呢?Top n的那个结果为什么没输出呢?哎,是不是后面我们又要延迟一毫秒收集齐所有的数据才输出对应的结果啊,所以大家看到这两个时间不一样,对吧。前面的那个窗口的结果是不是先拿到了,然后后边我们是不是用一个那个process方式去做处理,是再过一毫秒延迟之后,等到所有这个AJ的结果都收集齐了,然后才排序啊。所以大家看这里边就是假如说我有多个这个对应的,呃,对对应的这个URL的话,是不是它也同时都会在这里边直接A都输出。
11:03
但是排序是要等到一毫秒之后才排对吧?啊,所以那这个一毫秒怎么体现事件时间是不是相当于还是得这个watermark前移啊,对吧?继续啊,这里边得有这个来一个五二的数据,大家看又来了一个data,下边是不是就多了一个这个排序的这个窗口结束时间的输出啊,这里边结束时间是25分50秒,然后只有一个这个presentations浏览量一对吧,他排第一直接放在这儿了啊,这个看起来没什么问题啊,然后接下来我们想测一下这个迟到数据。那大家想,假如说这个时候我再来一个四六的数据的话,会怎么样呢?这里边会怎么样?啊,那那首首先我先不要不要测这些啊,假如说我就直接往后看,大家想下一个要关的窗口应该是什么时候。
12:00
那应该是55秒结束的窗口对吧,但那假如说我给一个55的话,其实还不会关,大家想到这里边我直接给一个55的话。这里边是不是大家看到我,我直接只有一个data输出,对不对,只有一个data啊,这里边并没有对应的那个结果,我是不是必须要等到56来的时候,当前的water mark相当于才涨到了55,这个时候才会。才会关掉下一个窗口啊,大家看这里面是不是就AJJ就来了对吧?啊这这这个就又多了一个这个呃窗口它的count是四,为什么是四呢?是不是前面不包含5556,前面这个49505152都有啊对吧?啊,这个是大家能想到的啊,这个没有问题,如果是再下一个窗口的话,那自然是不是就是六幺来了之后才才行啊啊六幺的话,那其实应该是10:26零幺了,对吧?啊601秒对不对,应该是这样一个一个关啊这里面为了大家方便测试,我可以再多一点这个其他的数据,比方说我再来一个。
13:07
56秒的,这个叫做present。大家看我来一个这个数据。那这个数据是不是也因为还不涉及到这个窗口关闭嘛,它是不是也属于这个,就是接下来那个六零,也就是说26分整要关的那个窗口啊,所以它它还可以继续收进去,对吧,比方说我再来一个啊,比方说这个是57。再来一个数据。大家看这里边,诶,大家看这个57如果来了的话,为什么前面这就多了一个输出呢。这是不是,哎,这就是我们说的当前的这个water mark是不是前进了呀,Water mark前进了之后,之前我们只是输出了一个AJ的这个统计结果,要输出这个榜单的这个排序结果,是不是必须要这个watermark再再往前进一步啊,因为有一毫秒延迟嘛,所以前面你看这个25分55秒的这个结果是到这儿的时候才真正输出了。
14:06
那接下来我们想关这个26分整的那个窗口了,那是不是要给一个。那就得给一个26分是不是零一秒的数据了,对吧?呃,当然这个零一秒的数据,这个无所谓啊,你你给什么都可以,比方说我就只给一个一个杠,这也算一个URL对吧,这无所谓的啊,我就是要推进一下当前的这个water mark,然后我把这个一推进。大家看这里面是不是同时输出了两个AJ啊,为什么有两个AJ?这大家是不是看不太清楚,我把这个稍微往上移移一下啊,大家看这里边直接输出了两条HG。一条是presentations的,一条是present的,因为我们当前这个窗口里边,是不是现在有两两个不同的URL都有数据了,所以我统计的时候是不是分不同的key做统计嘛,那是不是最后是不是两个都得有一个count值输出啊,那所以大家看就是这样,理论上来讲他们几乎是同时输出的。
15:12
那只不过这里边我还应该是有一个先后对吧,先来一个后来一个啊,那大家看到这里面就是presentations count是六,那present count是二。那大家想最后我输出榜单的话,是不是还要在这个零一基础上再去加一更新一下这个当前这个water呀,对吧,当然这个water就无所谓啊,你直接给。给哪个URL都可以,我只要是当前这个时间变了就行,对不对,因为我提取watermark是在一开始没分组之前,是不是就已经做了这个,呃,提取时间出和water的工作啊,所以我们尽量接近那个数据源那边啊,所以接下来更新一下当前的这个water mark,大家看是不是输出了。当前是不是排名第一的是这个六,排名第二的是这个二,对吧。
16:00
这个过程都完全没问题。我们现在想要的是。测试一下,哎,测试一下当前这个迟到数据,那大家想假如说这个时候啊。我下边又来了一条数据是。就给大家看一眼,又来了一条数据,是四四十六秒这个时候的数据,那大家想他现在还能处理吗?这个46秒的数据,大家看我现在已经是26分零二秒了,对吧,那你现在又来了一个25分46秒的数据,这明显是迟了好久了,迟了十几秒了,对吧?那现在我如果要是他如果来了的话,那大家想我当时不是允许那个alo late1分钟嘛,那现在是不是还应该在一分钟等待的时间内啊。那大家就会想到,它应该属于哪几个窗口呢?哎,是不是前面我们已经统计输出,现在已经统计输出几个窗口了。最前面看的话,50这个窗口是不是已经关掉了,对吧?呃,大家想我们现在其实不是关掉了啊,真正意义上是不是只是做了一次计算输出了,但其实还没关对吧?呃,他还做,还在放在那儿等着那个迟到数据呢,要等一分钟呢,所以50秒这个窗口这已经输出一次了,然后55秒这个窗口是不是也输出一次了,另外接下来。
17:22
20,呃,26分整,也就是60秒的这个窗口是不是也输出一次了,那大家想当前46秒这个数据属于哪几个窗口呢?它是不是这三个窗口都属于啊,因为我们当前窗口长十十分钟呢,对吧,所以26分的这个窗口,它应该是属于10:16分到10:26,那你说10:25的时候,你当然属于啊,对吧,10:25:46当然是属于的,同样最前面那个十点。25分50秒关的窗口,它是不是应该是10:15:50~10:25:50啊,这个10:25:46是不是也属于。
18:06
所以接下来如果我输入一条这个数据的话,大家看一眼。它会一个什么样的效果?哦,这个我没有没有copy上啊。嗯,这个好像还是没有科比上。好,大家看一下。当前是什么样子,是不是他直接就输出了三个更新了三个聚合结果啊,我们输入了一个这个46秒的时候的这个数据啊,直接更新了三个结果,它更新的是哪哪哪些。是不是直接把我们五零的时候,当时不是只有一个吗。现在是不是不是在之前的基础上count变成二了。大家回忆一下之前这里边第一个这个550窗口啊。这里面是不是只有一条啊,接下来是不是直接count变二了,然后我们第二个输出的那个窗口,它是不是应该是四个呀,现在在四个基础上是不是看的变五了,然后第三个窗口。
19:11
26分的时候,Count是不是六,我们当前是这个presentations对吧,Count是六,那现在它是不是就变成了,是不是变成七了。大家看在之前的基础上是不是都做了一个更新啊,哎,所以这个是完全没问题的啊,但是大家发现它这里边更新是不是只能更新我们当前窗口的那个聚合那一步操作啊。他并不能更新我们后边那个榜单对吧。那后边榜单怎么办呢?那其实有同学可能想到这个不对呀,之前我们那个榜单做做这个输出的时候,其实我这里边的数据是不是每来一条数据,就前面我的那个聚合结果啊,是不是每来一条数据,我这里边都要去注册一个定时器啊,那你想我这里边如果是更新了这里的这个结果的话,难道不是这里边又要注册一个定时器吗?
20:03
大家想是不是我这个AJ只要有一个结果输出,这里边调到后面来之后,就会艾进这个list,然后去做一个注册定时器的操作啊,那我这里面注册的这个定制器,大家想注册的是什么?不还是之前的那个窗口结束时间加一毫秒吗?那大家想我现在的这个窗口当前的是不是已经根本已经超过了呀?完全超过了对不对,那为什么这里面没有直接输出呢?对这里这里就是大家要注意啊,我这里边如果是注册了一个已经过时的一个定时器的话,它不会直接去触发。那那他难道就永远触发不了了吗?那那相当于我这注册的,这不就没用了吗。其实也不是的啊,它会触发,它会在什么时候出发呢?会在下一次water mark更新的时候触发,因为之前我们给大家讲过water mark如果要是现在不更新的话,上游任务是不是就相当于不像下游任务在广播watermark了,所以大家看下下面我们做process的这个处理的时候,它其实就会发现我当前OL不更新对吧?不更新的话,它是不是相当于就不去处理我的定时事件啊?
21:14
因为大家想我处理定时事件,处理闹钟,是不是必须要是我的时间变了之后,我才去考虑当前有没有要出发的这个奥特曼出发的那个时间,对吧?你后面我们不是有那个on timer吗?On timer这里边肯定是必须要时间发生改变才会触发到这里的定时操作。所以这里边大家看既然已经过时了,那现在我要触发它的操作怎么办?对大家想到是不是我再往后推,推进一下当前的这个。对应的是不是就可以了啊,所以接下来比方说啊,我在这个基础上,当前的这个water mark是零二了,对吧,那我接下来要推进是不是就必须给一个零三啊,对吧,我给一个零三。
22:01
推进一下当前的water。大家看现在是不是就输出结果了,而且一下是不是更新了之前的三个榜单啊,但是大家发现了发现问题了,这里面输出好像有问题。你看我们更新这个榜单,50秒的这个结果。当时是只有presentations对吧,现在也是不是只有presentations啊,我们更新的只有它,但是它输出的结果是啥呢。是不是相当于他直接刷榜了呀。是把之前这个presentations自己的一个排名,这个二排在第一,然后自己的那个一哎排在这里边排第二,对吧,大家觉得这个这个合理吗。这个完全不合理对不对,哎,他这里边是不是就就不应该直接做这样的一个操作啊,对吧,你这里边之前的那个应该是清掉,我们这里边是不是应该是用这个新的,直接保留一个新的就够了。啊,那继续大家往后看啊,这个55秒关的这个窗口,是不是也是有一个这个刷榜的这个行为啊,后边这个60秒是不是也是这样一个行为啊,哎,所以大家会想到这个怎么样去更改当前的这个代码呢?
23:09
我是不是在这里边,其实前面少做了一个操作啊,就是我这里边输出结果之后,是不是相当于我应该把之前的那个例子就应该直接清掉啊,现在我的那个状态是不是一直保存着,一直一直没动过啊,所以你想我之前已经统计过一次的那个结果,是不是就相当于已经保存在这里了,对吧?这个presentations它的那个COUNT5,这个结果我已经保存了一个了,呃,保四已经保存了一个了,现在我又更新之后又来了一个五,那是不是就相当于我后边不是每来一个都艾特进去吗?那是不是相当于这个例子里边就多了一个数值啊,然后就变成了一个,呃,最后我去做这个排序输出的时候,就相当于把它全排到一块儿了啊,所以大家自然能想到我这里可可以加一句,就是当前的那个配置real count count list state要做一个清空,对吧,要做一个更新,哎,那这个时候大家想到如果我去测试一下的话,这个有问题吗?
24:09
这个有没有什么问题?我们先执行执行起来啊,接下来我就直接啊,直接测我们前面这个,呃,整个测这个过程了啊,我一条一条传一下,首先是这个四九。四九这个数据的话,这个非常简单,大家看到就是一条数据输出对吧,然后是5050,大家知道不会涉及到这个窗口的关闭,就是一条数据对吧,窗口还没有没有做计算的啊,然后五幺这个时候窗口做了一个聚合,一个窗口到达关关窗时间了,对吧,它不会真正关闭,但是会触发一次计算,统计出来是COUNT1,然后呢,到五二的时候,这里会有一个。对应这个结果的输出对吧?呃,这里边会把这个榜单啊,五零的这个结果COUNT1直接输出了,这个大家都知道,然后后边的这个到下一个窗口要结束的时候,五五。
25:08
这里还不会有结果,输出到五六的时候。是不是输出了呀,诶这里边是会有对应的这个结果,A j j count是四啊,就是五五要要关的这个要结束的这个窗口,Count是四,然后接下来如果我们想看到对应的那个结果的话。大家会看到就是我接下来再多一个另外一个K啊present来来一条数据,然后我更新一个auto mark会看到五五对应的那个窗口,对应的这个统计值四已经出来了啊,但是这个前面这个present这里边它属于的那个窗口是六零对吧,26分要关的窗口,这个我们还得往后看,所以是等到26分零一秒的时候这个要关。呃,就是结束的是那个六零时候的窗口,所以大家看到两个值对吧,COUNT6COUNT2直接放在这儿了,然后接下来这个结果要输出是不是得等到零二的时候啊,这里面又得做一个更新对吧?诶大家看这里边得到了六二,这个没毛病啊,然后接下来。
26:13
接下来我是不是来了一个迟到数据啊?四六对吧,大家看一下更新了之前的三个数据,这个还是一样的,Count变七对吧?呃,最早的那个五零的那个窗口count变二,五五的变555,呃,六零的那个窗口count的变七,这还是之前在我们之前基础上叠加了一,这个没毛病,关键是后边那个榜单怎么输出,榜单要输出的话,前面我们说了,是不是必须要再推进一下当前的watermark呀?来看一下这个结果。哦,当前这个是看起来没问题了,更新之后是不是只剩下这一个了,不会再刷榜了,但是大家发现是不是后边我那个也没了呀。这个大家理解为什么会出现这种情况吗?对大家想清空,我是不是并不是选择性清空了,是不是直接全清掉了,哎,所以既然你全清的话,后边我在更新的时候,我是不是相当于只更新了里边某一个值的那个统计的结果啊,但是我把那个例子里边的所有的那个URL对应的那个统计结果都清掉了,那当然是不是最后我就只能输出一个了。
27:24
那大家就觉得这个就不对啊,你这个确实更新之后,它是更新这个结果他是七没没毛病,但是我怎么知道他一定是排NUMBER1呢。你这么排的话,他是不是一定是NUMBER1啊。对吧?哎,那我更新之后,他有可能我更新的只是那个,呃,排第二的,排第三的那个出对不对,那我应该是把完整的榜单做一个输出,对不对?哎,所以这种方式其实还是有毛病的啊,所以这只是我们一个,呃,对于这个乱序数据的一个简单测试,那接下来我们还可以对这个代码做一个改良,继续做一个改进。
我来说两句