00:00
在这个过程当中,其实会发现可能会有一个问题,就是你你在这里边,我们这个统计其实频率很高,对吧?你在这里边每一个窗口长度是十分钟,然后隔五秒就要出,就要就要去,呃,就是输出一个窗口就要统计一次当前这个十分钟的值,然后呢,我们这里边给了一个water mark的延迟时间,我们用最大乱序程度啊大概的看了一下,拍脑袋给了一个,给了一个一分钟,也就是说你每五秒就要快速的输出一个结果,结果呢?呃,你当前的这个输出的当前的这个统计结果是延迟了一分钟的一个结果,哎,那大家会觉得这就有点莫名其妙了,是吧,那你你当前既然是要频繁的去更新,那说明我们对于这个当前实时性的要求非常高啊,你快速的要去看到最新的这个结果统计,而且是不停的啊,统计这个之前十分钟之内的数据,对吧,而你这里边呢,整体的延迟要有一分钟,那这不就自相矛。
01:00
顿了吗?那就感觉就没必要你后边要这么快的输出对吧?啊,那在实际应用当中,我们到底这个water mark给多少呢?而且我们之前说过,你这个其实不推荐大家给特别大的watermg延迟,为什么呢?因为你这里边延迟,我们后面做的很多触发操作,相当于都在等对吧?那就相当于窗口他都在那个数据都来了,然后他都在那儿等着,然后占据着内存状态都不能释放,所以这个过程其实对于我们系统资源的占用是一个,也算是一个浪费,如果说这里边我们没必要去给这么大的话,可以尽量给小一点,所以往往一般情况下,我们这里边应该怎么给呢?那那有同学说你这里面给小了不就不正确了吗?确实是这样的,所以我们往往是要把这个watermark和啊,就是前面我们说过的乱序数据处理,是不是还可以结合这个窗口的延迟触发机制来做一个整合啊,啊,所以这里边我们可以把这个窗。
02:00
口本就是本身的这个water mark的延迟设置的小一点,然后呢,在后边我们把窗口给它多开一会儿,多等一会儿,设一个相对比较时间长的一个等待的延迟时间,然后最后如果还有没有处理完的数据的话,我们再把它塞到这个测输出流里边去就可以了,这就是我们之前讲过的乱序数据的三重保证,那沃mark在做设置的时候呢,你最好也要服从这样的一个规则,对吧,就是我们说的啊,整体数据的乱序的这种情况,乱序的程度,它整体来讲应该也是符合一个正态分布这样一个标准的,那其实应该是在一段时间范围内,大部分应该可能比方说啊,我们真实的这个数据场景,应该是最大的乱序程度可能是这个,呃,一分钟60秒,那大部分的数据呢,可能他这个在一秒钟之内,正负一秒钟之内,大部分也就都到了,对吧,因为大家想这个你如果纯粹的。
03:00
这个网络延迟和这个因为分布式导致的这个乱序,它其实一般也就是几十毫秒几百毫秒嘛,啊,我们一般这个上网的时候,如果你要做,比方说玩一些这个即时战略游戏啊,那个延迟如果到几百毫秒其实已经很卡了,一般情况下我们那个网络延迟正常情况可能几十毫秒是比较正常的,所以这个可能一般在一秒钟之内都能搞定,那所以我这里边呢,就可以不要这么设置一分钟,我直接在这儿给一个一秒钟的watermark延迟,它能搞定大部分或者是一部分主要的这种情况,然后呢,哎,对于这种漏网之鱼,我在后边可以去窗口里边调用一个啊,就是可选的API allow lateness,对吧,允许它做一个延迟处理,那这里我再去定义一个一分钟,那家就会会想到,那是不是相当于就是整个这个延迟没那么大对吧,那整个这个所有的数据还都。
04:00
是,就是延迟一秒钟,我这边就直接去判断去处理了,那比方说我当前有一个,呃,九点钟的窗口,那它其实也就是延迟一秒,大概延迟一秒啊,Watermark到九九点的时候,我这边窗口就该输出结果计算一次了,对吧?只不过呢,窗口还不要关,我继续保持一分钟,在接下来的一分钟之内,诶,就是我一直等到water rock涨到9.01之前来的所有数据,我还可以在之前聚合结果上做叠加,做进一步的输出,对吧?啊,就是这个是常见的这种处理乱序数据的一个手段啊,那当然另外了,我还可以做一个,就假如说一分钟之外还有一些个别的漏网之鱼怎么办?Set output,对吧?做一个特殊输流,把这个list data扔到特殊输流里面去,这里面我们要定义一个output tag啊,它的类型是当前的这个数据类型,阿阿帕奇lo event,我们给一个标签叫做类,那后面你怎么样把它提取?
05:00
取出来呢,哎,注意啊,基于AJ去提取对吧?哎,它基于它转换之后的那个变stream去提取,所以这里边我可以定义一个就是当前的a j stream,呃,我可以先把这个打印出来给大家看一眼,对吧?这是AJ,我们把那个data stream也打印出来,这是data,然后呢,A stream还会有一个side output,那这里边我还是把那个output TB阿帕奇log跟那个定义一样,对吧?这里边给一个这个late一样的类型,一样的标签,把它拿到做一个print输出,这个是late啊,这就是一个完整的处理流程,我们三种保证都加进来,然后接下来我们可以测试一下,看看这个效果到底怎么样,对吧?好,我先把这个运行起来,当然如果要测试这个的话,我们最好就不要用这个文件读取数据了,你最好是一个流式的对吧,这个能看到一条每一条数据带来的变化,那这里边我们也不用卡夫卡了,那个有点重啊,我们直接就还是用最。
06:00
简单的,大家还记得这个socket文本流对吧,我定一个local host7777,起一个NC,然后做一个这个处理就完事了,好,所以接下来我们在这里来一个NC-LK7777啊,或者你随便给一个端口都可以对吧,8888,可能大家平常做测试的时候经常会用到啊,我这里边定义了一个7777,然后把这个代码启动一下,大家看诶,我们接接下来把这个窗口放在这里,我们准备输入一些测试数据啊,代码已经提起来了,接下来我们一条一条数据输入啊,大家看这里边我就呃,我就不测那个不同的URL了,对吧?啊,大家看到我这里边都是同样的URL啊,一条一条数据输入,最先输入的一条是10:25:49,这样一条数据,好。然后我在这里给一条数据来,大家看有一条data输出,然后没有窗口关闭对吧?呃,然后也没有任何的这个统计输出,也没有最后我们的那个排序数据输出,然后接下来我再给一个50,因为大家想到我们当前是那个整五整十秒对吧?诶,那这这里给50秒的时候,呃,这里给这个25分50秒的时候,是不是应该有窗口关闭呢?诶不是,因为我们当前你给了50这个数据,那现在的water mark是到多少呢?Water mark要减一秒,对吧?所以water只是到了49,所以即使我们知道整五整十秒的时候,50秒这里应该有一个窗口,但是现在还没关闭,所以如果我们想要让这个窗口关闭的话,大家看到这里边是不是要有一个51输入进来啊,对吧,五十一五十一数据进来之后,当前的watermark找到了50,大家看输出了一个聚合结果,对吧窗口。
07:52
关闭了聚合了,呃,窗口没有关闭,因为我们还有那个十道数据,对吧?但是窗口触发了它的计算操作,输出了当前的这个,呃,PRESENTATIONS1,为什么是一呢?50跟51,这都不属于50这个窗口啊,对吧?这两条数据都属于下一个窗口,所以这里边只输出了一个一,然后接下来呢,大家看到没有输出我们排序信息啊,哎,那是我们说的又延迟了一毫秒对吧?基于window end又加了一毫秒,那就是50秒加一毫秒的这个时间才会触发定时器,你现在的water mark只到了50秒,那当然没触发了啊,那怎么样去触发呢?再推进一下water mark,对吧,我这里边再给一个52的这个数据,诶,大家看到这里边给了52的数据之后,Water mark进展到了51,然后我们要去触发一个50加一毫秒的一个触发器,对吧,你现在watermark到了51,是不是已经超过这个时间了。
08:52
啊,所以当然它可以出发了,对吧?所以这里边我们输出来一个结果,当前这个URL统计热门度是一对吧?啊这个就这个过程还是应该大家是没有什么问题的啊,然后关键我们现在还想给大家测一测这个乱序数据的这种情况,那接下来呢,诶,我们再返回来给大家输入一条46的数据,给大家一条46啊大家看看输入46之后它的行为是什么呢?诶46这条数据data three输出一下,然后呢,又有一个聚合结果,大家要注意一下啊,聚合结果的这个窗口是哪个窗口呢?550对吧?829550,这是不是还是之前我们聚合之后的这个这个窗口啊,在之前的基础上一叠加变成了二对吧?哎,这里边变成了二,诶,那大家可能会说,那为什么现在当前这个没有对应的这个最后窗口,就是我们这个排名的数据的这个输出呢?诶这里大家要注意啊,就是我们。
09:52
前面做这个窗口延迟触发的时候,那是窗口本身定义的,我们那里就会去输输出这个结果,这个是肯定会输出的,对吧?而我们后边的这个排序数据的输出呢,是靠定时器控制的,这个定时器呢?哎,大家会想到是不是相当于之前我们已经触发过一次了呀,对吧,已经触发过一次了,然后这里边相当于我又有了一个这个输出结果之后,AJG这里边有了这个结果之后,大家想在代码里边是不是后边又会注册一个定时器啊,就在我们这个后边做这个process element的时候,在这里边我们这不是每来一个pay review count,什么是pay review count,不就是这里我们输出的这个结果吗?每来一个窗口统计的结果,我就要去注册一个定时器,对吧?所以现在是又注册了一个定时器,但大家想你当前注册的这个时间点不是应该是550对吧,50加一毫秒吗?现在奥M不是已经涨过了吗。
10:52
为什么它没有直接出发呢?正是因为我们后边定时器的出发必须要依赖于water,新的water mark来才能够推进它出发。之前我们讲watermark朝下游传递的时候,也给大家说过,朝下游传递的时候有一个规则是什么呢?本身是朝下游广播对吧?哎,那还有一个规则是如果我当前的这个watermark更新了,我才朝下游去去传递,如果没有更新,是不是下游就根本收不到新的water mark呀?哎,所以大家发现这里边就是相当于我下游呢,根本收不到water mark,所以这里边就没有触发它的操作,没有触发我们当前的这个定时器,对吧?啊,那所以得怎么样,怎么样才能够收到这个新的water mark呢?当然就是你把这个当前的时间再去推,推进一下就可以了,比方说我们把这个再来一条五三的数据,大家看现在是五二,最大的是五二对吧,Watermark是。
11:52
五一,那现在你给一个五三的数据的话,没复制上啊,我还是把这个copy一下,哎,接下来大家看给一个五三的数据,那是不是现在哎,当前的这个water mark就涨到了五二对吧?在之前的基础上又推进了water,现在就可以把这个输出了,现在更新热门度变成了二,对吧?啊这个行为稍微有点奇怪,但大家要稍微的理解一下啊,知道它到底是怎么回事就可以了,然后假如说我这里边给一个这个三幺的话,这里边啊,我在输入一条,再之前的一条数据给一条三幺的话,诶大家看这个就很奇怪,我这里边为什么会输出好多条这个窗口聚合结果呢?A,大家看输出很多条对吧?五零这个比较比较容易理解,我不是那个十分钟嘛,十分钟,那当然五零的这个窗口也包含三三十三十一秒的这个数据,对吧,这个是没问题的啊,然后那怎么45秒这里边也也输出了一个结果呢,而且40秒35秒都输出。
12:52
啊,对吧,为什么会有这种情况呢?哎,这就是我们说的啊,就之前我们是因为之前的这些窗口里边没有数据,所以并没有结果的输出,而现在呢?诶,大家看现在这里有数据了呀,为什么有数据了,因为当前的water mark涨到了五二对吧?呃,就是我们现在涨到了五二,尽管是50之前的这个窗口应该都关了,但是我们不是还定义了处理这个一分钟的迟到数据吗?所以只要是在这一分钟之内的来的数据,大家看啊,就是前面我们这个窗口,这个窗口应该是35秒关,对吧?那你现在是不是还在35秒等待这一分钟迟到数据的时间范围内啊,对吧?本来应该是35秒就关了,现在已经50多秒了,但没关系啊,我还等着呢,所以你来了一个31,它属于35秒该关的这个窗口,所以它还可以再去加进去,这里边又输出了一个一,同样道理,40秒的窗。
13:52
口它也属于对吧,这个,呃,45秒的窗口也属于,50秒的窗口也属于这个里边已经有三条了,对吧,别的都是一啊,那那同样你再来一个23的话,大家看一下这个行为,那就是来了一条之后,又好多个窗口都更新了,哎,你看这里面就是五五十这个窗口已经变成四条数据了,那四十五四十和35呢,变成两条数据,对吧?啊,然后还有就是我们这里边的三十五三,呃,这个30和二。
17:52
对吧,25分,我现在是进展到现在的时间是多少?10:25说water mark啊时间53秒,对吧?诶,那是不是我现在之前那个,呃,10:24:50的那个窗口就应该彻底关了呢?哎,对吧,这个当然没问题啊,这个窗口就应该彻底关了,所以属于这个窗口的数据,哎,那大家想呃,就是再来的话,就不会再再添加到追加到这个窗口里面去了,但是如果说我现在给一个10:24:49的数据,是不是它就直接是输出到测出去测输出流了呢?不是的,它不属于这,呃,这个就是它属于这个10:24:50这个窗口,这个是已经关了,但是它同时还可以属于10:24:55的是关闭的这个窗口啊,大家想想是不是这样对吧,因为这个十分钟呢,我们。
18:53
一个窗口长度十分钟的啊,所以它还属于这个10:25结束的这个窗口啊,啊,就包括它也属于我们当前10:25:50结束的这个窗口,对吧?甚至它还属于我们当前没到点的10:26呃的的窗口,它也属于对吧?诶所以当前它其实是还能收进去的,它不会直接输出到这个呃测出据流里边去,我们这里边真正测输出流里边输出的数据是什么呢?是所有窗口都已经它属于的所有窗口都已经关了,也就是说假如有一个数据,它属于的最后一个窗口是10:24:50的话,那这个这个数据所有它属于的所有窗口就都关了,对吧?这个数据就输出输出到测试数据流了,什么样的数据呢?那就是十分钟之前的数据对吧,这里边给大家举一个例子,就是假如说哎,就是我们当前这个窗口应该是属于什么呢?是10:14。
19:53
50秒到10:24:50对吧?有一个数据如果是10:14:51的话,大家想想这个数据它是不是属于的最后一个窗口,就是它,你再往后推的话,那就是10:14:55对吧?诶它就不属于了,所以它属于的最后一个窗口就是它连这个窗口都已经超过了一分钟的延迟,都已经关了,那是不是这个数据就真的没人要了,对吧?啊,他就会输出到测试数据流里面去,所以这里面给大家试一下啊,给一条这个十点,我给大家把这个还是复制一下当前的这个数据啊。
20:36
啊,这里边我直接改这儿吧,对吧,给一个10:14:51,大家看一下这个效果,这里边就可以把它直接输出到测试出流late输出对吧,单下来之后可以测试啊,如果你不给到这个时间段的话,它不会的,因为它还属于别的窗口,这就是关于这个乱序数据的处理,关于那个bug我们下节课再讲。
我来说两句