00:00
我们已经了解了处理函数的分类,那在这些处理函数里边,哪一种是最重要,我们应该重点掌握的呢?诶,那很明显,那就是实际应用最广泛的,我们就要重点掌握,那这里边应用最多的是哪个呢?很显然那就应该是基于最基础的数据结构,哎,比如说这里的process function,它是基于data stream的,还有k process function,它是基于k stream的,这两种数据结构非常的基础,所以在实际应用当中也是最为普遍的。那这里边的重中之重其实是这里的K的process方式,因为我们知道啊,如果说我们后边想要去做聚合开窗之类的操作的时候,那必须首先要做一个KBY基于k stream才能够进行进一步的转换,哎,那所以一般情况下啊,我们的代码里边处理的都是k stream。而另外还有一个呢,之前我们也提到了,在一般的process function里边,它可以去调用自己的time service这个定时服务,去注册定时器,但是事实上这个是无效的,这个是没有用的啊,那这里的on time方法其实也没有意义,所以真正想要使用定时服务,想要去注册定时器,指定什么时候去处理什么操作,只有在kid stream里边才能够执行这些操作。
01:24
所以可以说k stream是我们最重要的数据流类型,而k process function也就是最重要的处理函数,所以接下来我们的重点是要介绍它,我们可以把它翻译成按键分区处理函数。简单来说的话,哎,我们也已经知道了啊,就是在代码里边,如果说我们基于一个当前的data stream,先做一个K。得到了一个k stream,然后我们在k stream的基础上去做一个process操作,哎,那这里边要传入的就是一个K的process方function,那K的process function我们点进去会发现啊,它跟process function没有直接的继承关系,但是呢,里边的形式几乎是完全一样的,我们可以看到它也是一个process element的方法啊,这里边同样三个参数,当前的数据。
02:14
上下文啊,另外还有一个collector类型的al啊,那所以我们使用的时候也是啊,基于当前的数据,每来一条就触发一次当前的process element这个计算可以结合上下文里边的信息,调用out.collect进行输出啊,那另外还有一个抽象方法叫做on timer,在K的process方式里面真正可以派上用场了。啊,那另外这里的这个上下文是什么呢?诶,那我们看context里边可以获取当前时间戳,另外最重要的有一个timer service啊,那另外还可以使用这个output这个方法去进行测殊出流,这里多出来的一个就是还可以捕获当前的K,因为我们是K的process function嘛,有K的定义,当然是可以捕获的。
03:02
所以在前面我们看到它的泛型里边,比之前的process function也多了一个参数,那就是K当前键的类型啊,之前我们那个process function就是I和O输入输出,现在多了一个键。啊,那这里边最重要的呢,肯定我们还是看这个time service了,在这里边我们点进去看啊,这个接口就完全一样啊,跟process function那边使用的time service是完全一样的,这里边的方法能够调用的方法就是这六个。Current processing time current water mark,然后register啊,注册定时器两个方法,然后delete定时器两个方法,所以整体来分的话,这六个方法可以分成两大类,一大类就是基于处理时间processing time,另外一大类就是基于事件时间even time。哎,那在这个基础上呢,我们又可以分别对他们进行不同的操作,一个是获取当前时间,哎,那就是current什么什么,这里的current watermark whatmark就是even time,就是当前的事件时间,然后呢,可以注册定时器,可以删除定时器啊,所以这个最重要的,其实这里的定时器操作像前面的补货当前的时间,那一般的process function也可以做。
04:18
而这里的注册定时器和删除定时器就只有在K的process方式里面才能够操作,哎,那所以接下来我们就来做一个代码中的测试啊,那当然了,这个测试的话,我们主要是想测这个定时器的行为啊,那我们就分成两部分吧。先测一下处理时间,再测一下事件时间啊,那我们分成两部分,首先用一个。我们要测的是处理时间processing。Time timer test。那方法先写出来,开始的流程当然还是完全一样了,我们直接照抄,创建执行环境,然后读取数据源,把上面的下划线引入,方便后边做影视转换。
05:05
接下来我们想定义处理时间的定时器,那要定义定时器的话,就必须使用key process function,那当然先要做key by了。那这里我们基于什么KBY呢?其实基于什么都行,甚至如果说我们不想单独指定当前的K的话,诶,那就是像前面说的啊,我随便给一个常数也是可以的啊,比如说我们这里边直接给一个啊,给一个处。布尔类型的常数,哎,那或者说我们给一个字符串类型的常数都是可以的,只要做了KBY,得到了K,就可以去使用定时器啊,至于我们这个到底分开了没有,这个并不重要啊,所以接下来就可以调用process方法里边就要去实现一个自定义的,我们直接用匿名类的形式来实现,那就是。Key的process function。里边的类型我们知道是KIO啊,那当前的key的类型呢?这是一个booing类型啊,然后输入的数据类型event输出的数据类型我们还是简单一点,STEM输出一条信息就好了,接下来里边必须要实现的就是一个process element方法啊,那当然了,里边的处理逻辑我们就可以自己去定义啊,关键是要注册一个定时器,我们先来获取一下当前的时间吧,呃,因为当前这个CX,我们可以调用timer service去捕获current processing time。
06:28
所以我们可以把它拿到,哎,比方说我们把它叫做current time,然后接下来。直接做一个打印输出,哎,那我们这里就out.collect我们做一个描述,当前其实是有一个数据来了之后,就会调到这里的process element方法,然后我们可以输出当前的时间到底是多少,哎,那所以数据到达,然后当前时间是。后边我们就直接加上current time啊,我们直接把它跟在后边,然后接下来呢啊,我们还可以做一些操作,那就是可以注册一个定时器,注册一个。
07:08
比方说五秒之后的定时器,这里是处理时间,哎,所以我们就是处理时间与以下的定时器ctx调用timer service去register,注意是processing time timer,然后接下来里边呢,需要传入一个长整型的值,这就指定了当前定时器要触发的时间戳,哎,那所以这里边我们看到啊。所谓的定时器其实就是指定了一个时刻,指定了一个时间戳,然后这个时间戳呢,在弗link的底层就会被保存在一个优先队列里边,排着队等待执行,哎,那那所以我们可以认为这个定时器啊,它就是我们在k stream上定义好的一个状态。哎,那所以之前我们说这个基于k stream去做开窗统计,去做聚合的时候,它不是针对每一个K。
08:06
都会单独的进行一个统计吗?那所以其实就是针对每个K都有对应的自己一个状态。那当前的定时器呢,既然是基于k stream的,当然也是针对每一个K都是有效的,所以当前如果我们定义这个定时器的话啊,如果我们这儿有很多个不同的K的话,那是各自定义各自的,每个K都可以定义在什么时候不同的时候去触发对应的事情。只不过呢,我们这里边的处理逻辑不用区分,他们都是按照同样的逻辑,到点的时候执行就行了,只不过注册定时器的时候,他们是分开的,而且对于k process function里边定义的定时器而言,这里定义的时间戳就是它的唯一标志,也就是说在一个时间戳上只能有一个定时器。那我们如果要是重复注册呢,在同一个时间点上同重复注册定制器会发生什么呢?诶,那就是相当于还只是一个,相当于做了一个去重,后边我们的那个on timer方法只会被调用一次,哎,那所以这样的话,有时候我们就可以非常肆无忌惮的去注册定时器啊,怎么注册都可以,比如说这里边我们定义一个五秒之后的定时器,那就是基于current time去加上一个五秒钟,诶注意这里的定时器时间戳也是毫秒,所以得五乘以1000,哎,那这样的话就是五秒钟之后的一个定时器。
09:33
好,这个是注册完了,注册完了之后,那接下来这就完了吗?当然没完啊,那当前我们只是有了定时器了,有了闹钟了,那闹钟响的时候我们到底要干什么呢?所以必须还得去定义。定时器触发时的执行逻辑。那这个逻辑就是在所谓的on timer方法里面去实现了,那这个on timer方法我们看到啊,它的参数。
10:01
这里有一个长整型的time step,这就是这个定时器触发的时间戳啊,因为我们说这个定时器我们可以随便注册嘛,我注册了一个五秒钟的,那下面还可以再注册一个十秒钟之后的定时器,那到底是哪一个定时器触发的时候到我们这里来执行on方法呢?要注意每一个注册的定时器,它出发的时候都会调用到这里的on time方法。啊,那这就有一个问题了,那我怎么知道当前到底五秒钟该执行什么,十秒钟该执行什么呢?这个逻辑怎么区分呢?那就通过当前的time stepmp去做一个判断就好了啊,就是你判断一下time,如果是五秒之后的,那你就执行五秒之后的逻辑,如果是十秒之后,那就十秒之后的逻辑啊,所以我们这里的timemp非常的重要,这就是定时器的唯一标志。然后后面我们看到还有一个CX,这个CX呢,很明显了,它就是kid process里边定义的一个on啊,就是在这里边呢,它也是一个上下文,这个上下文里边的东西呢,相对就少一点,它是有一个时间域,另外呢,还有一个方法可以获取当前的K,这个时间域里边呢,哎,主要就是看当前到底是事件时间还是处理时间啊,判断一下当前的时间语义啊,所以这里我们能做的事情就都看到了啊,那接下来呢,我们就还是。
11:25
在当前的on time方法里边去定义一下自己的处理逻辑,我们也是直接输出一条信息吧,alt.collect直接输出,那就是定时器触发触发。时间为。后边可以跟上当前的时间戳。哎,那这就是我们简单的一个测试的过程啊,那如果说我们想要运行执行它的话,那后边还需要去做一个打印输出,然后env execu执行起来,哎,这就是我们完整的一个测试流程,接下来我们可以直接运行看一看效果怎么样。
12:03
现在我们定义的是五秒钟之后的数据。啊,数据到达我们看啊。现在数据不停的来,接下来我们看。因为我们是五秒钟之后嘛,所以我们看啊,已经有了五条数据之后,接下来就有一个定时器被触发了,它的触发时间是什么呢?诶,我们看它后边的时间是91374啊,那我们知道374这个是毫秒数啊,我们可以不看啊,就看这个9191374啊,那么对应着其实就是。第一个数据86374,它的五秒之后设置的就是这个时间。那他为什么这个时候触发呢?啊,很显然我你看前面这个数据到达已经到了90425嘛,然后在后面过一会儿之后,不就到了91374嘛,之后再来的数据,诶,那就是91407,这就是之后的一条数据了,一秒钟一个数据嘛,所以我们看到啊,所有的事件,不管是数据的到来还是定时器的触发,都是严格按照我们当前的时间来进行的啊,因为我们当前是处理时间以系统时间为准啊,这就是在key process方式里边。
13:15
去测试了一下处理时间的定时器。
我来说两句