00:00
毅然测试了processing time timer,那当然也应该测一下even time timer啊,那even time timer他的行为又应该是什么样的呢?啊,直接来创建一个class,接下来就是event time。那。Test。然后我们直接就把这个copy过来吧。干脆直接粘这儿吧。好,飞过来,下面直接写啊,Execute。还有个画括号,然后接下来的这个操作,其实就跟前面processing time timer基本上是差不多的啊啊,我干脆就先把这个全copy过来吧。我现在要既然要测这个事件时间定时器,那么在这里边定义的时候。这里就不应该获取当前的时间,那我这里边就获取当前的current water mark嘛,啊,其实大家会想到这个获取current water mark可能会有点儿问题,因为之前我们测试已经发现了wal mark是一开始是一个极小的负数。
01:07
然后呢,它总会滞后,总是一个数据来了之后water mark更新是会更新上一次数据来了之后的那个water mark,所以它并不是当前数据带来的那个water改变,诶那什么样的数据会更加的靠谱一点呢?那其实他Sam还更加靠谱一点,对吧?啊,因为当前我们都是升序数据嘛,M sammp本身不就是当前数据自带的那个时间戳吗?哎,所以这个我们可以看的清楚一点啊,然后这里还可以加上啊,我们在后面再加一个吧。一个啊,那当前的。我们直接就写一个这个ctx,直接把那个长整形的数放在这就得了,Service第current water mark,哎,这样大家看的清清楚楚的就是到达时间是多少,然后当前的water mark是多少,所以这个到达时间其实我们不应该叫到达时间了啊,其实应该是时间戳。
02:08
真正的这个当前的事件时间,其实watermark嘛,在后面的,然后接下来注册的时候,当然也不要用time timer了,要用even timer。然后后面on timer里边触发的时候,我们直接提取当前的这个time stamp对吧,这个是没有问题的啊,这应该也是当时注册的那个时间点啊,那整体上来讲就这么简单啊,稍微改一改就可以测了,接下来我们直接运行一下看一看。好,接下来还是这个推在不停的运行。哦,大家可以看到啊,这个数据不停的到达,你看这个auto一开始就是负数,很大的负数,对不对,然后后边这个更新的过程当中啊,每一个这个water,其实你看这个427,这是426是跟上一个数据有关的,跟当前的这个时间戳,其实是有一个滞后的,哎,所以这个是有这样的一个差距,来我们先停掉啊,我们来分析一下。
03:04
第一个触发的这个定时器是carry的,这个定时器触发的时间是50分23秒427毫秒,哦,这个直接一看的话,很明显,这不就是它吗。就是第一条数据来了之后,注册的十秒后的定时器,因为我们就是按他这个时间戳去定的定时器,那定时器的时间戳当然没问题,当然是十秒之后,那但是如果大家仔细看一下的话,你会发现。这个定时器触发的时候。他是跟之前的这个数据都是按照顺序来的吗。其实不是的。其实你看这里边,前面有一个数据是50分23秒524毫秒。这条数据在前,而427毫秒的这个定时器在后。为什么会这样呢?其实我们知道事件时间它的时间控制的机制的话,这个非常好理解,它就应该这样,为什么它是必须要water mark涨到对应的位置才可以触发当前的定时器,当前这个数据到达的时候,这个时间戳是多少呢?时间戳才到这个512,那其实是上一个数据带来的watermark的改变,其实只到了22秒512毫秒,那你这个定时器能触发吗?这是23秒的呀,触发不了。
04:27
所以在这个数据到来之前,根本这个数据就触发不了,只能是他先来,他来了之后,然后现在的water是不是就变成了23秒523毫秒啊。啊,那所以这个时候我们可以触发一下啊,这里如果大家看的不是很清楚的话,我们可以在。在当前触发的这个地方也加上。啊,我们可以用ctx service.at currentmark,然后接下来我们看一下。
05:02
先运行一下啊。当然可能跟刚才的那个数据就不太一致了,因为我们是随机生成的嘛。好,接下来我们看一下啊,还是按照这个数据,一个一个到达十秒之后,第一个数是53分25.307对吧,好,我们直接停掉吧。我们看一下,现在就得找了啊,找一下A定时器第一个出发的啊,爱丽丝53分25秒307,这里触发是53分35秒307,没有任何问题,哎,那这时候他的water mark是多少呢?大家看394。这个water明显是因为前面这个35秒,395毫秒这个数据到来而引发的变化,所以当前的water mark是这个。已经到了35秒395毫秒了,而呃,现在的这个触发的定时器呢,它定设定的时间是35秒307毫秒。
06:04
这就是我们之前说的啊,只要是比当前的水位线小于等于它的事件都应该到齐了,都应该触发了,所以比它小,当然这个时候由他来出发啊,这就是关于这个事件时间的定义。当然了,如果说想要测试的更加的详细一点的话,我们这里也可以给大家做一个另外单独的一个定义啊,比方说我们在这儿单独定义一个测试的数据源,之前我们那个click s已经是自定义的数据源了啊,我们这里再单独定义一个,为了测有限啊,那大家还记得我们之前定义S啊。我把它implement source function。里边的类型当然是event里边必须要实现的方法,一个wrong方法,一个cancel方法啊,那这里面其实我们都根本不用考虑那个无限循环看测方法要怎么停了,直接就发出我们想要的那个测试数据就完了啊,所以在这里我们直接直接发出测试数据,那这里的测试数据用ctx collect。
07:13
啊,有一个event啊,那前面的这个event我们应该有过对应的代码,你有一个。好,直接把这个event直接输出去啊,那个Mary的一次点击,它是一秒钟的一个时间戳啊,那这里为了让大家看的更清楚一点,这里中间我们可以停一会儿s sleep,比方说我们停个五秒吧。然后接下来啊,我们设置一个,呃,这个这个其实不用设置啊,呃,这这这s function这里是不能直接设,就是像这个定时器之类的东西的啊,我们只能是去去就是sleep停一会儿,让他等一会儿再去发出数据,这个是可以做到的啊,然后接下来我们再来一个数据,比方说啊,那我们这个随便给啊爱丽丝。
08:00
然后后边我们发出一个11秒的数据啊,大家会想到这个数据,那就相当于已经把它当前的这个water mark就要推进到十秒之后了啊,就是一秒,然后当前这个water mark是一秒减一毫秒,那如果11秒的数据来了之后,那就是11秒减一毫秒了啊,然后接下来我们在这儿再评一下th sleep。停五秒钟啊,那来了这个爱丽丝这个数据之后,当前我们第一条数据上面不是要注册定时器吗。这里会被触发吗?我们先看一下啊,这个还是click source customer source。运行一下。好,大家看Mary的这个数据来了,五秒之后我们要再发一条数据,诶诶,这个数据来了,大家看到watermark是多少啊?999,对不对,当前是,然后突然一下子全触发了auto mark到了一个很大的数,这又是怎么回事呢?
09:02
大家看这个数来了之后,他要等五秒,等五秒是不是就结束了呀?哦,结束的时候,那这怎么办呢?Flink的流处理程序是这样的啊,如果当前的输入数据都已经结束,整个数据集处理完毕,那么如果是事件时间语义的话,会把watermark直接推进到。长整形的最大值,很大的值。那么既然推进到很大的值,是不是所有的定时器就都被触发了,都肯定比它小嘛,所以就一下子全部触发啊,它是这样的一个节奏,那为什么我们这里这个来了一个爱丽丝11秒的数据之后,我这里不能把它触发呢?那大家会看到这个数据来了之后,当前的water mark是999。那大家会想,哎,那肯定呀,你这个auto mark这还没到一秒呢,这肯定不会触发,诶不是的啊,这是当下就这个第二个数据来的时候,处理这条第二条数据的时,他的water mark是999,处理完这个之候,那是不是它就会推进water呀,对吧?所以你要是隔一秒钟之后再去打印这个water的话,你就会发现它至少这个应该是10999了啊,就应该是这个减一毫秒嘛。
10:10
但是大家就会想到了,10999,那也不是11秒啊,它的触发时间是11秒,那你10999肯定是触发不了他的,所以这里边我们可以继续。类似的做一个测试啊,这里边我给一个Bob。Bob的数据,这边我来一个10001多加一毫秒。他接下来就应该会把这个watermark推进到11秒整,那11秒以及之前的所有事件就都应该发生啊,Water到这个就会把这个定时器触发,所以接下来我们运行一下,看看是不是符合我们的预期。然后大家看到这第一条数据啊,一开始沃是最小值,五秒之后第二条数据999。第三条数据,第三条数据一来,第一个Mary的定时器是不是直接就触发了,所以当时确实是还触发不了11秒的定时器,但是这个数据一来了之后,它就可以触发11秒的定时器了。
11:12
啊,那同样这两个字是到最后五秒之后,所有的数据输入完了,Watermark进展到了最大,才把最后的两个定时器触发,啊,这就是这个事件时间与一下定时器的它的一个行为,这个东西还是很好玩的,可以实现非常非常多非常灵活的功能。
我来说两句