00:00
在process function里边其实有一个遗憾,别的东西我们都看到了,就是有一个看起来最有趣也是最诡异的东西,那就是定时器我们没有测到啊,因为在这里边可以去尝试着,比方说啊,我们在这里ctx service,你可以尝试着去注册一个定时器啊,比方说这边给一个长整型的时间,比方说我直接给一个5万啊,那这个时间有点太早了,无所谓啊,我就注册一个,我们可以看一眼。运行一下大家看直接报错了。这个报错是什么意思呢?他说如果你想去设置定时器的话,只能在一个kid stream上面,所以一看这个报错信息,你就知道怎么回事了,那这个没办法,这没办法测,那我们只能去KBY,然后去创建一个。Kid process function啊,所以接下来下一部分内容就是所其实也是重点啊,大部分我们在实际使用的过程当中用到的处理函数以它为主,基本上就是它是用的最多的key process function,那这里边它就真正的可以使用所谓的定时服务了,可以设置定时器了,那之前我们都已经说过了,它主要定时服务里边的方法就这么几个,那如果要是分类的话,其实就是处理时间一类,事件时间一类其实就三个方法,一个是获取当前时间,一个是注册定时器,另外一个是删除定时器。
01:24
然后两种时间语义分别有一套啊,啊,那除了这个之外,另外其实就是有一个on timer方法在里边,我们可以去专门处理定时器触发时候的事情啊,那接下来我们使用的时候也非常简单啊,那就是先KBY,然后点process,基于k the stream去调process方法,里边传的就是一个key的process方式。接下来我们就具体来做一个测试。因为大家会发现这个定时器显然两种语义就会截然不同啊,就是有这个处理时间和事件时间,所以接下来我们首先做的是处理时间,Processing time time。
02:07
啊,那前面的这一部分我们就不用多说了。直接拿过来用吧。既然是处理时间,后面我们确定不用事件时间的话,那这个就不用分配了,对不对,ORA没用了啊,所以只用前面这一部分就好了。然后接下来env execute,然后把这个括起来啊,当然了,这里面大家知道,本来这个应该是一个stream source啊,但是它本身也继承了single output stream operator啊,所以你直接写这个single output stream operator也是没问题的啊,然后有了这个stream,接下来我们要测试,所以。RAR先K败当前的数据。其实我可以基于user去做一个分组,我不去分组也行,对不对,因为它只是要求类型必须在K的process function里边嘛,那我直接这写处其实也是可以的啊,那一般其实我们不会这么去用啊,还是给一个具体的,他说以user为标准作为这个键啊,做一个分组,然后接下来呢,就是直接。
03:15
低于kid stream里边就是new,一个key process function。然后我们看一下k process function类似的,它也是一个抽象类,那么它有三个泛型啊,这个类型就是KIO,那是一个是当前当前K的类型。I和O就简单了,输入输出嘛,之前process function,它就是IO,就是输入输出啊,那我们现在呢,多了一个K的类型,K已经确定了,就是string,然后输入也确定了event,那输出呢,这个我们可以自定义一下啊呃,那我们当前好像也不需要什么特别的输出,就是试一下嘛,所以给个string吧,给一个信息出来就完事了。然后里边必须要实现的一个process element方法。
04:03
这个process element方法里边我们就可以做一些操作,可以去注册定时器,然后去啊做一些调用了啊,那这里边我们首先先获取一下当前的处理时间,然后基于当前的处理时间呢,设置一个,呃,比方说我们可以设置一个十秒之后的定时器啊,因为大家会想到有一些操作我们可能并不想当前就去做,哎,我们是想,呃,或者说就是这有点类似于一个窗口的一种实现了啊,就比方说我当前先收集着数据,收集数据呢,我不要来一个数据就就统计输出一次,那什么时候输出呢?哎,我就设一个定时器嘛,就过一段时间,然后输出一下不就好了吗?啊然后只要当前还有定时器,那我就不要再去重复的再去注册定时器,只要没有的话,我就注册一个,然后过一段时间去去触发就可以了啊,所以这种情况大家会看到有点像窗口,但是跟窗口又不太一样,因为窗口如果要是滚动窗口的话,那应该是严丝合缝。
05:04
一个窗口接一个窗口,对吧,这个时间是连着的,那我们现在呢,这就没准啊,有可能是我来了一个数据,好,那那我现在这个。相当于开了一个窗口啊啊,然后这个点的时候,我统计一下这里边到底有多少数据,然后呢,接下来如果没数据,没数据我就不管了啊,就可能是等到下一个数据又来的时候,然后我再去统计,再开一个窗口统计接下来哎,这到底有多少数据,大家看这好像有一点像会画窗口的感觉啊,但是跟会话窗口又不一样,因为我是固定长度的,我就统计它接下来的这段时间内的到底有多少数据啊,所以就是有点像用这个定时机就可以灵活的实现游走在会画窗口和这个滚动滑动窗口之间的这些特定的这些定时需求啊,所以它使用就特别的灵活。好,就这么一个需求,我们先拟一下当前的这个时间,时间戳current time ts吧,啊,那这个基于ctx的时间戳是可以拿到的,但是这是当前数据的时间戳,我们现在处理时间的话,我们想知道当前的处理时间是多少。
06:13
那有同学说,那你直接system,直接调那个当前时间不就完了吗?啊在这个分布式系统里边啊,最好我们还是用这个flink给我们自带的提供的这些接口,那你直接用这个timer service不就可以了吗?对吧,这里面标准的方法,这不是2nt processing time嘛,把这个拿到,然后接下来我可以直接输出一条数据啊,大家知道就是当天有数据来了吗?我输出一下看的清楚点啊。就是数据到达啊,那到达时间。是多少呢?哎,那这里我们可以new一个time STEM,把这个时间戳包进去,Time STEM。目前的current ts进去。好,有了这个之后,那接下来我们还可以干别的。
07:03
注册一个。十秒后啊。定时器。这个注册的话还是要用到timer service register。Processing time timer啊就是处理时间的定时器,然后里边一个长整型的时间戳,表示触发的毫秒数啊,大家看跟我们之前这个用到的时间啊,时间戳的地方都是一样的啊,就是只要用到时间戳,还有这个automark里边的时间戳都是一个长整音的毫秒数,所以这里边我们就基于当前的时间加上。十秒,那就应该是十乘以1000毫秒对吧?啊,那这个是1万嘛,我们就用这种方式看得很清楚,就是多少毫秒。是注册了一个定时器,那注册定时器也没算完啊。目前这个闹钟啥时候响我是知道,响了之后要干啥呢,总得输出点东西吧,所以接下来我们直接可以out.collect看到这个on timer里边啊,也有一些消息,也有一些参数。
08:12
能获取到什么呢?On timever里边当然就没有数据了,因为它是靠时间触发的嘛,不是靠数据触发的,所以它的这个触发的是时间,所以我们能获取到的是当前的一个时间戳,其实就是当前定时器定它的时候的那个时间,对吧?到底是几点的闹钟放在这儿了,然后还有一个on timer contact。这也是一个上下文,这个上下文呢,它extend这个我们之前目前这个process element里边的这个上下文啊,所以这是一回事,它就是process function里边的一个上下文,它可以获取到所有的这些东西。Timer service time stamp还可以干什么?Get current key啊,获取当前的键啊,所有的这些都有,另外呢,它这里单独的有这个,呃,Get current key以及一个time domain啊,时间域的一个方法可以获取到啊,这个其实都不是特别重要了啊,我们关心的其实就是当前时间是什么,那我们来写。
09:08
定时器触发。触发时间。好,那这里我们再来一个time stamp里边包装的,那其实大家知道出发时间就是当前的time stamp。好,就是这个,你这里如果想要把这个这是user嘛,你可以把这个当前的user也写进来啊,那比方说我们可以加一下东西,对吧,这个比方说这个前面加一个啊。当前的Ctx.at key是当前的user对吧?啊,谁的数据到达,然后。接下来我们再来一个谁的定时器出发。空一格吧。就是我们定义的最简单的这样的一个定时器的测试,是处理时间的定时器,然后接下来我们来打印一下一些效果,因为这里直接输出的就是STEM嘛,就直接可以看到它的这个输出的信息。
10:10
运行。好,我们看一看,诶,大家看到爱丽丝数据到达对吧?啊,这是42分16秒,然后一秒一秒。诶,大家看到有一个定时器触发了,对不对,什么时候触发了一个定时器呢。我们停一下,大家看一下第一个定制机什么时候出发的呢?是。16:42:26494毫秒啊,它的定时器的时间戳是这个,那那大家会看到这是谁定的,那个定时器呢?爱丽丝定的对吧?那爱丽丝哪条数据来的时候定的呢?显然就是第一条数据嘛,十秒后准准的对不对?哎,十六四十二分16秒494毫秒,42分26秒494毫秒。
11:00
啊,这就是第一个定时器,那第二个数据是17秒478毫秒,那我们看到第二个定时器触发,当然也是。Marry啊,27秒478毫秒,准准的,因为它是处理时间,我们这里面的处理时间就是到这个时间马上就触发了。呃,所以你看这里边它的这个时间是完全按照我们这个时间顺序来的啊,处理时间当然就是按照这个标准顺序了,你看这个前面第一个定时器触发了之后,26.494毫秒,然后26.656毫秒呢,又有一个数据到达了,我们每每一秒一个嘛,啊然后呢,接下来27.478秒,又有一个定时器去触发,接下来呢啊,27.633秒又一个数据到达,所以大家看这里就是总是交错开的,因为一秒一个嘛,定时器刚好是设了十秒之后,那也是一秒一个啊,所以这个就是正好交错开,一个数据到一个定时器触发,一个数据到一个定时器触发啊,这就是处理时间非常典型的一个定时器的行为。
我来说两句