00:00
我们已经了解了怎么样在kid process function当中去注册一个定时器啊,前面我们注册的是处理时间的定时器,那就是调用timer service下的register processing time timer这个方法就可以了,那它的测试结果其实也非常的容易理解啊,因为是处理时间嘛,所有的事件都是按照顺序依次发生的。那我们想到,如果要是事件时间的定时器又应该怎么做呢?代码上很容易实现,那就是把这个处理时间改成even time就可以了。但是对于事件时间的场景呢,我们知道时间的推进它是依靠水位线去往前推移的,而水位线呢,又是依赖于当前数据本身里边带着的时间戳,所以这个时候如果我们测试的话就会发现。有可能出现一些比较费解的情况,接下来我们就来测试一下,看一看到底是怎么回事儿。所以接下来我们还是在当前的目录下边去新建一个scle object,这个就叫做event time。
01:03
Timer test。同样下面先创建一个密封法啊,那里面的流程的话,跟处理时间就差不多,我们直接把它全部copy过来。对应的依赖引入,我们这里需要把它改成下划线啊,这个完整的流程还是一样的,现在是事件时间啊,那么所以我们这里边首先要确定前面我们是提取的时间戳生成了watermark,这个没有问题,然后接下来呢,诶,那就是这里注册定时器的时候要调用的就是register time timer。然后前面其实还得更改啊,因为我们当前基于的是当前时间加五秒了,但是我们当前时间用的是processing time,这个就有点儿不靠谱,所以我们应该把它当前时间也换成事件时间。当前事件时间,那就是水位线嘛,Current water rock。然后如果我们直接这么测试的话,可能还会有一点问题,就是因为这个water mark我们是获取到了,但是呢,我们并不知道当前数据是哪条数据,所以我们还应该知道数据的时间戳是什么,然后结合watermark和时间戳,我们就会知道接下来定时器的操作了。
02:16
所以呢,接下来我们就在这个out.collect里边输出的时候啊,我们可以用一个字符串差值,哎呢,就是数据到达当前时间是Dollar current time,然后接下来我们还可以再加上。当前数据。时间戳是。这里边我们直接使用Dollar从数据里边提取,那就是value点。Time step啊,这样的话我们看的就会更加的清楚一点,好,那接下来我们可以运行一下,看一看效果怎么样。哦,现在我们的数据已经来了,诶我们看到啊,一来了之后,它就直接触发了一个定时器,而且这个时间呢,看起来非常的不靠谱,是一个绝对值很大的负数,是一个很小的时间。
03:06
为什么会这样呢?啊,其实我们仔细看的话就会发现这个数,这不就是之前我们在邦迪的out of order needs对应的那个实现里边指定的初始值吗?诶,所以我们会发现啊,第一个数据到达的时候,这个数据的时间戳,诶可能没有任何问题,84805,但是当前的时间,当前的water rock是什么呢?还是最初的初始值,这就是我们说的沃马要依赖数据到达之后,然后每隔200毫秒周期性的判断一下当前的最大时间戳,然后才能生成对应的水位线。那在这个数据到达之前和这个数据到达的这一时刻,那显然现在的water mark还没有进展。它就是最初的初始值,那就是一个很小的数啊,那所以接下来我们看到它注册的定时器是什么时候呢?我们看到75808,如果五秒之后的话,那就是加五就变成了70808,所以我们看等一下它直接就触发了,为什么直接触发。
04:08
因为我们当前的最大时间戳,根据第一个数据已经变成了84805,那接下来下一个200毫秒我们判断的时候,当前的水位线就应该已经变成了8480啊,当然了,实际上的话,我们这里升序排列的数据有一个减一毫秒的延迟,哎,那所以是84804,但是不管是什么,肯定之前这个定时器就可以出发了啊,因为超过这个时间了嘛,所以我们就直接有一个定时器的触发。然后接下来当前这个数据,第一个数据来了之后,他注册的定时器是什么呢?那应该是五秒之后,89805,诶这个89805什么时候触发呢。我们看到他要等到几条数据到来之后,在89859这个数据到来之后,然后触发了第一条数据注册的定时器啊,这是89804,哎,为什么804,因为我们是以当前的current time这个water mark来指定的当前的注册的这个定时器嘛,所以当前的watermark的话,比它小一毫米,应该是804啊,所以这里边我们就是89804这个时间戳。
05:17
我们看到这个定时器触发的时间点比之前到的这个数据的时间戳是要小的,并不是按照我们的时间顺序依次排列的,这是为什么呢?哎,这就是我们说的前面这个数据到来的时候,当前的时间其实只进展到了88845,那当然这个时候不会触发这个定时器,只有在这个数据到来之后,89859到来之后,现在的。水位线进展到了八九。858。诶,所以接下来他已经超过我们之前设置的这个定时器时间了,这个定时器才会被触发。这就是事件时间语义下,所有时间的出发点都是依据我们当前的水位线的进展来进行指定。
06:07
也是因为事件时间的这样一个特性,所以我们看到啊,就好像定时器的触发是稍微有一点延后这样的感觉啊,因为它总要依赖数据到来之后才能够去触发,如果说我们这里的数据更加密一点的话,那这个延后的时间就可以忽略不计了,这就是关于事件时间定时器的一个简单的测试,如果说我们觉得这个例子还是有点看的不太清楚的话,我们就还可以啊,自定义一个数据源,不要用这个click s了,我们直接把这个停掉,在这里自定义一个数据源,比方说我们就叫。Custom source。自定义,哎,那后边这里边我们就得去做一个实现了。Source extend source function。啊,那这里面我们当然发出的数据还是event数据了。必须要实现两个方法,一个wrong,一个cancel,我们这里只是想要测试水平线,哎,所以也不需要去指定cancel,也不要指定对应的那个循环的标志位了啊,不需要循环生成数据,所以这里边呢,核心的逻辑就是直接发出测试数据。
07:15
测试数据,指定它的时间戳就可以了嘛,哎,所以这里就是ctx直接collect,那这里面我们就生成一个event。呃,里边的话,比方说user随便给一个marry。然后点击了一个。后面。第一秒的时候发出了一个数据。这里为了让我们看的更加明显一点呢,不要一下子输出这个所有的数据啊,啊,那我们这里可以间隔一段时间,所以我们可以。间隔五秒钟。那这个的话,我们就直接调用thread.sleep5000啊,休息五秒钟,然后接下来。我们不是注册了一个五秒之后的定时器吗?哎,所以我们接下来继续发出数据,我们看一看间隔了五秒钟之后,当前这个定时器会不会触发,先看一下这一条,所以。
08:09
首先还是啊,继续。发出数据。那这里我们就发出一条两秒钟的数据。然后接下来,哎,那当然可以继续发出了,同样我们还是啊,继续去休息五秒钟。然后继续发出数据。其实我们知道啊,当前的这五秒钟,这是处理时间,其实对我们定义的事件时间定时器的触发完全没有影响啊,那所以这里边如果我们数据的时间戳只到了两秒的话,肯定不会去触发的,那什么时候能触发呢?那应该是五秒之后,诶,那我们想到了,那应该是。六秒这个数据来了之后就应该触发了,哎,所以这里我们可以再去做一个测试。这里SLEEP5秒。接下来我们就可以直接做一个运行测试。
09:03
运行起来看一看结果怎么样,诶我们看先输出的是两行数据,一个数据到来了,第一条数据来了之后,定时器被触发了一个,然后呢,哦,第二条数据到来了。没有触发定时器,第三条数据到来了,触发了一个定时器,然后触发了最后一个定时器,结束。所以我们看到三条数据注册的当然是三个定时器了,他们触发的时间依次是第一条数据来了之后。接下来马上第一个定时器就被触发了,然后第二条数据来了之后呢,没有触发任何的定时器,第三条数据来了之后,触发了第二个定时器。最后有一个定时器是在我们程序结束的时候才会被处罚。这到底是怎么回事呢?这里我们可以做一个解释。第一条数据到来之后,他注册了一个定时器,但是我们的逻辑是使用当前的current water mark,以它为基准加五秒钟注册定时器的,那当前的water mark是什么呢?哎,我们说一开始没有数据的时候,Water mark初始值是一个很大的负数,哎,所以这个时候water马是这样的,那么它注册的定时器呢,就是以它作为基准加五秒,那就是70808。
10:21
所以我们看到啊,1000这个数据来了之后,接下来我们说200毫秒生成一次嘛,生成的水位线很显然就是999。哎,所以接下来很快就直接把第一个定时器触发了,因为这个定时器非常小嘛,是一个很大的负数,那999已经超过它了,当然这个定时器就可以触发了,然后接下来。等了五秒钟,哎,我们会看到这个过程当中啊,这个五秒钟的等待,这就是当前系统时间的等待,等待的过程当中没有任何的定时器触发,然后来了第二条数据时间说是2000。这条数据到来的时候呢,诶当前的。
11:02
Water mark还是999,还是上条数据引发的water mark变化,这条数据引发的变化还没生效啊,那所以我们看到有这样的一个滞后,然后接下来呢,呃,它没有触发任何的定时器,因为我们知道他所注册的定时器,那就不再是很大的负数了,而是当前的water mark基础上再加五秒钟,所以他注册的其实是。5999这一时刻的定时器。哎,那所以当前当然没到啊,不会触发,所以接下来要等第三条数据,第三条数据又是五秒钟之后。6000这个数据到达了,它到达的时候呢,当前的时间戳诶没变,那是基于第二条数据到达之后,当前时间戳就变成了1999。但是他注册的定时器是5999啊,所以要一直等着,所以我们看第三条数据来的时候呢,水位线watermark还是之前的,诶,但是接下来最大的时间戳变了,所以之后水平线就更新了,就更新到了5999,所以就直接触发了我们第二条数据所注册的这个定时器。
12:13
好,当然了,第三条数据所注册的定时器呢,是基于它当时的水位线auto mark,再加上五秒钟注册的是6999的定时器,那这个时候什么时候出发呢?那要等到整个程序运行结束,在flink里边,整个程序运行结束的时候,它会默认将当前的水位线推进到长整型的最大值,就是一个很大的正数,所以这个时候呢,所有注册好的定时器就全部会被触发,诶,所以这个时候就输出了当前的触发结果。这就是关于这个具体的一个解释啊,当然了我们还可以把它做一点更改,就是你像我们当前有一个困惑,就是每一条数据来了之后,它不是基于自己的时间戳注册一个定时器,它是基于当前的watermark注册,所以就很奇怪,那假如说我基于自己的时间戳去注册呢?啊,我们看可以这样啊,上边注册定时器的时候,不要使用current time,而是使用当前的时间戳,当然时间戳我们可以从value里边点time Sam去提取,或者我们也可以用。
13:17
Ctx里边有一个time step,直接这样去提取也是可以的,接下来我们再来运行一下,看看效果会有什么不同。好,现在诶第一条数据来了之后,没有定时器的触发,我们看它就一直等在这儿了。第二条数据来了之后,没有定时器的触发,也是等在这儿了,然后第三条数据来了,同样没有定时器的触发,一直等到程序结束的时候,三个定时器全被触发了。哎,这又是为什么呢?首先比较好理解的是这个水位线还是之前的解释,哎,那就是在数据到来之前,水位线是以之前的最大时间戳减一来进行处理的,哎,那所以呢,总是有一个滞后,所以1000这个数据来了之后,后边的水位线变成了999 2000这个数据来了之后,后边水位线变成了1999,诶,那6000这个数据来了之后,后边啊,注意后边的水位线就找到了5999。
14:19
但是。我们当前注册的定时器,第一个数据来了之后,注册的时间点就是6000,哎,那所以6000当然就不会被5999对应的水位线去触发了,所以要一直等到程序结束的时候,推进到最大的那个长整型的数,才会触发我们这里注册的所有的定时器。好,那为了印证我们这个想法,也可以在这里边多来一条数据,比如说我们在这里把这个复制一下。多来一条数据,这个数据它的时间戳不用大,只要多一毫秒来一个6001,我们再来运行一下,我们就可以看到不同了。
15:01
好,现在第一条数据到来五秒钟之后。第二条数据会到来,诶我们看到他们都不会去触发定时器啊,那只有注册定时器啊,他注册的是六秒钟的,他注册的是七秒钟的,他注册的是12秒,诶我们看6000这条数据来了之后,不会触发定时器,但是6001这条数据来了之后,他就会把之前第一个数据注册的6000的定时器触发了。因为我们有一个减一的操作嘛,当前的时间戳减一就是对应的水位线,那接下来200毫秒生成一次,以当前最大的时间戳减一作为水位线,当然就可以触发第一个定时器。这就是关于事件时间定时器的用法。
我来说两句