00:00
现在我们可以测试一下这个在启动之前,我们需要先把1102上的NC先提起来。7777,好,把这个起起来。这边启动。我们直接把它分一下屏,首先我现在把这个NC提起来了啊,现在代码也已经提起来,那么我们来把数据做一个输入,好运行一下,呃,那这是第一条数据了。其实后面的这个结果我们都已经知道啊。这几条数据不会有任何的影响。这是前三条数据啊,那我们从这个Mary的第四条数据开始,就直接把它调大一点吧,我们知道第一个窗口应该是零到十秒,所以是应该正常来讲啊,我们会想到十秒的数据来了的,来了的话,那么就应该关闭零到十秒的窗口了,就是至少它应该是输出一个结果了啊,那现在我们因为watermark有延迟时间,延迟两秒钟,所以那就不是十秒钟准时能够能够输出数据,而是应该。
01:15
Copy下。而是应该在12秒的时候才输出啊,事实上我们在处理这个的时候,你如果给11秒,或者我们现在因为是毫秒数嘛,你可以后面给很多位,比方说11999。大家可以看到一直都是不触发的,然后只要这里变一下,变成12000,也就是12秒整的时候,大家就看到了这里触发了啊,当然现在我们的这个URL只有三个,因为只有前三条,爱丽丝鲍b carry,他们的这三条是在零到十秒之内的,从十秒开始,这已经属于下一个滚动窗口了,所以有三条统计数据result出来了,然后现在零到十秒,按照我们正常的想法。
02:03
它就应该关闭了,哎,那如果这个时候,假如说。假如说我来一条数据是八秒钟的数据,那大家说现在他这个数据还可以正常处理吗?哎,其实是可以的,因为当前的窗口并没有关闭,它只是到water mark涨到十秒钟了,只是这里边触发了一次计算而已,输出了这里的结果而已。啊,那假如说我这里边有一个8000的话,8000毫秒,那么就是八秒钟的数据,我们看一下。大家可以看到直接有一条数据,诶,这是这个product Mar的这个PRODUCT200啊,那然后直接就输出了对应的一个结果。因为当前它属于的那个窗口零到十秒,窗口已经到结束时间了,所以它是个迟到数据,迟到数据呢,来了之后就应该马上触发窗口的计算。然后他直接就输出了COUNT1。
03:02
啊,那之前是没有这个200的对吧,PRODUCT200的,所以它是一嘛,那如果说我们再来一个的话,比方说这里再来一个9000。那当然了,它就又触发一次计算又来一个,这里变成了二是在之前的基础上继续叠加。那之前这个是没有的嘛,那如果是之前有的呢,比如说我们这个像前面这个home home这里面统计出来是已经有一了,那比方说我再来一个Alice home呢。Copy一下。啊,当然这里面其实你给1000都是可以的啊,就还给这一个,呃,一第一秒的数据我们看一眼。大家可以看到Alice home,是不是点home就变成count变二了呀?之前是一,现在变二了,两个它还在叠加啊,那如果说我们把这个Bob啊。如果觉得之前这个数据都一样,大家有点疑惑的话,我们再把这个改成home,然后比方说随便来一个3500。
04:05
啊,3500跟之前刚好一样了啊3600。我们可以看到来了Bob的一条数据点home.home这个URL can't变成了三。它在不断的叠加,在之前本身这一个窗口计算结果的基础上,在不停的叠加。啊,那大家可能会想到这叠加到什么时候为止呢?就是我们说的有一分钟的延迟时间,所以他一直在在叠加,一直在等着。啊,那这个就有一个问题,那我们这在这测试都测了半天了,呃,难道还没到一分钟触发它结束吗?啊,大家要注意我们这个一分钟,他也是因为我们当前是事件时间窗口。那它的结束时间触发是靠水位线来触发,那现在呢,延迟的这一分钟也应该靠水位线来触发啊,所以那我们自然就想到了之前第一个窗口是零到十秒。
05:05
这个窗口如果再要延迟对应的啊,如果要是再往后要延迟一分钟的话,那是到多少秒的时候才会真正意义上的把这个窗口关掉呢?那是不是要到70秒的时候,十加60嘛,一分钟60秒啊,是不是要到70秒的时候才会真正意义上的关闭当前这个窗口啊,啊,所以基于这样的想法,我们就考虑要让当前的时间推进到70秒啊,那这个也是每一个用户的数据,其实我们发现了无所谓对吧,因为我们关心的是反而是URL嘛。70秒,那就700007万,我们来一个,这里来了一个数据之后,大家会发现啊。好像,呃前面的没有任何的区别,来了一个carry啊呃,这个100ID100,后面我们看的话,你会看到这个变成了70秒嘛,一分零零十秒,然后下面呢,又多了一个result,诶为什么它又会多一个result呢?而且是product ID,呃是200 count是四,这是哪个窗口呢?我们看看。
06:13
哦,他这里关的是八点,呃,零分十秒到20秒的窗口。十秒到20秒的窗口,为什么是70秒的这个数据来了之后关闭的呢?之前我们的水位线是进展到哪儿了呢?最大的时间戳是12秒,所以之前的水位线进展到的是十秒。所以之前十到20秒的窗口当然是没关的。那现在呢,突然一下跳变了,来了一个70的数据,哎,那大家会发现70的数据来了之后,其实它的water mark是68秒,那68秒都来了,前面是不是十到20秒的窗口就应该关闭了呀?啊,所以我们看到十到20秒的窗口里边有12344个数据,所以COUNT4。
07:06
而且都是product ID是200啊,所以这个就输出了一条result count为四,诶那你不光是十到20秒的窗口应该关,20~30秒的窗口也应该关呀,30~40秒也应该关呀,是的,没错,就是之前只要是超过了它结束时间的那些窗口都应该关,其实不能说都应该关,都应该触发一次计算,都应该触发一次计算,但是啊,但是我们这里边别的窗口里面没数啊。那你这个触发也没用啊,所以如果没有数,没有数来的话,我们知道那个窗口根本就不会创建,对应的那个桶根本就不存在,当然也就不存在,呃,所谓的触发计算和关闭的这个操作。所以现在我们就已经把前面的东西都搞定了,现在的水位线进展到了68秒。那现在是不是已经可以啊,就是假如说啊,再来一个零到十秒的数据,是不是就已经没有办法再触发计算了呢?诶我们可以再试一下啊,现在已经到68秒了,水位线。
08:10
就这个carry吧,对吧,这不也是刚好是零到十秒内的吗?3.5秒嘛,我们来一条。大家看还在叠加。Product id100之前我们不是只有一条这个carry这条数据吗?那现在它还在叠加count变二了,对不对,这还是在叠加,你如果再来一条的话。我们来一个6500。当然这个就变三了,对不对,当年URL没有变嘛,Count变三,还在不停的叠加啊,所以自然我们想到了你现在的water mark是到68秒嘛,没有到我们想要真正结束的那个70秒啊,那这个我们还可以继续继续一个一个去测啊,比如说。直接把这个71。
09:00
再来一个啊,然后我们。把六五的这个再copy一下。现在的water进展到了69秒,哎,所以同样这里边我们还能继续叠加,再来之前的那个迟到数据,你看还能继续叠加一分钟呢。哦呃,他现在看的变四了,但是如果我们把。当前的时间变成72000,现在的water。达到了70秒,70秒的话,真的到了零到十秒这个窗口延迟一分钟之后的这个时间点,那这个时候理论上来讲,所有的数据,呃,就是这个窗口,它就应该彻底关闭了。整个状态全部清空,那如果再要来这个时间段内的数据又会怎么样呢?再来一个数据,大家看到它就进入到了late里边。当前不会在result里边再去叠加了,而是放到了late测输出流里边去处理。
10:05
啊,所以大家就知道了,那我们之前你看到的这个零到十秒内的这个窗口,要注意的是,这里做这个更新叠加的时候,它是只针对当前URL去做一个更新,它不会把我们其他的那些数据都全部输出一遍啊,所以你如果想找之前所有的数据的话,那你还得看啊ID100的这个COUNT4啊,这个是零到零到十秒的,这是一个结果,COUNT4这是最终结果,然后前面还有这个我们看啊零到十秒,诶这也是一个把它放大吧。这也是一个啊,这个是个也是ID100,那那这个就没有意义了,ID200的这个啊,这是十到20秒的,这不是我们当前零到十秒已经关的这个啊,零到十秒home a3,这是一个最终结果。然后前面还有这个也是home,没有意义。零到十秒,ID200的这个COUNT2,这也是一个最终结果,哎,所以我们可以把所有的这些最终结果汇总起来,写入到一张表里面去,然后如果late这个数据又来了的话,尽管现在它已经不能自动在窗口基础上叠加了,但是我一看这个时间,哦,零到十秒,所以我就看零到十秒那张表有没有product ID等于100的呢?
11:19
一查发现有product ID等于100 count是四,那我就再把它叠加上去,Count等于五不就完了吗?更新一下这个数据就完了吗?啊,所以大家就看到了,这就是用一套系统实现了整个拉姆达架构的这样的一个做法。啊,这个测试的话一定要自己仔细一点,因为有三重机制在处理它。大家一定要推断清楚,到底当前你考虑的是哪个窗口的数据,然后当前窗口的数据,它的结束时间到了之后,会触发一次计算。如果再来。迟到数据,但是还在我当前这个窗口允许迟到数据的时间范围内的话,那它还会继续触发计算,而且是在之前的基础上叠加。
12:04
窗口是不会关闭的,但是如果说窗口的结束时间加上延迟的允许延迟的时间也已经到达的话,那么接下来就真正的关闭了。而这里所说的时间到什么位置,这个时间指的是什么呢?都是water mark,因为water是现在的事件时钟,你考虑water mark的时候,就要考虑到当前数据时间戳和water有一个延迟的关系,你还要把这一重关系考虑进去。所以这个在测试的时候非常考验细心以及对这些概念的理解。啊,这就是关于flink处理迟到数据的手段。
我来说两句