00:00
当然我们想到了,就是一般我们要用flink去做这个事件处理的话,更关心的其实是事件时间,对,所以我们知道这个处理时间定义,那与之对应还应该有事件时间的定义time,那time们大家知道这里边我们要处理乱序的话,那是不是要结合,就是相当于我得从数据里边能够提取出时间戳,对吧?另外是不是还能还需要能够设置设置一个watermark,比方说我延迟一段时间对吧,等待那个乱序数据到达,这样的话就可以保证结果的正确性。那同样其实跟前面那个处理时间是差不多的啊,也有三种指定事件时间的方式,一种方式就是data stream,没呃,Data stream转换成表的时候,我们这里边直接去指定,那另外还有两种方式就是在一个是connect指定那个tablechema的时候去指定,还有一个就是创建表的DDL里边直接去指定。接下来我们分别看啊,最简单的或者说最推荐大家使用的还是直接,就是我们在流里边直接把那个对应的事件时间,时间戳和watermark都生成,都提取出来,这样的话大家可能是最熟悉也最方便的,后边大家看是不是就直接在这个转换成表的时候定义一个当前的时间字段就可以了,就像前面处理时间是一个PC啊,Pro processing pro time一样,这里边指定的是一个。
01:27
大家注意这里面指定的是row time对吧?这里面直接针对一个字段,直接指定row time就可以。大家要稍微注意一下,在指定这个事件时间的时呃时候,其实这里面有两种方式啊,我可以直接time stamp.row。那大家想这是什么含义呢?大家要注意啊,这个如果我这么指定之后,Time stamp就已经不是我们最初数据里边的那个。以秒为单位的时间戳了这个我们可以试一下啊,大家看当前我这个当前的这个是处理时间,呃,时间语义对吧?那假如说我要做这个视线时间羽翼的话怎么办?是不是至少全局应该先设一下呀,对吧?Set。
02:15
Stream time character characteristic,然后把这个time characteristic even time先设出来,然后如果说我先读成这个流的话,那接下来是不是在这儿要去做一个assign Sam and whatmarks啊,这里边去new一个,还记得我们处理那个乱序数据吧,对,Bounded out of orderness extract,然后在这儿去提取当前的时间,戳element.get time去乘以1000,然后接下来。这里可以给一个时间的,呃,就是waterwa的延迟时间对吧,比方说给一个两秒钟直接放在这儿,这就是我们前面的这个基本的用法,那到这儿的话,如果说我这样定义pro time的话,这还是处理时间对吧,这还是处理时间啊,那所以我接下来可以我就把这个先处掉吧。
03:10
接下来我就要单独指定当前的这个是一个roll time,当然指定了roll time还可以sts给个别名对吧?好,那大家注意啊,这里边的这个TS还是我们一开始数据里边的这样的一个状态吗?还是这样一个一个长整形十位的这个秒数吗?哎,大家注意啊,这里边就已经不再是之前我们最初的状态了,这里边我给大家看一下执行的结果。哦,大家看到现在它的类型是什么,已经变成了time sam3,然后roll time了,对吧,这是不是就跟我们之前追加的那个pro time是一样的呀,对吧,类型是一样的啊,然后你这里边直接输出的话,你看到它是什么。
04:05
是不是已经直接变成了这样的一个时间啊,Time Sam对不对?呃,输出的时候time step to string的话,输出的就是这样一个年月日十分秒,然后你看到后面是不是还带毫秒啊,这个毫秒当然我现在都是零,那是因为我之前都是整秒数嘛,所以这里边都是一个零啊,所以这就是当前的这个定义啊,它其实是直接把这个字段是不是就改成我当前的那个roadtime了呀?那大家注意,这个周time到底是哪来的呢?那你既然都把我这个直接覆盖掉了,对,大家知道这时候time是不是在前边我这个流里边已经指定了提取时间戳生成water mark,是不是现在我已经知道当前时间戳和water mark是什么了,诶,所以接下来我就不需要单独指定了,对吧?直接rolltime的话,那我其实把这个字段直接就覆盖掉了,当然你如果不想把它覆盖掉的话,也有另外一种写法,对,大家看到就是我可以比方说我把这个去掉啊,这是time time stamp,对吧。
05:08
啊,当然这个time stamp,我可以还是sts,因为在CQ里边可能会不太好用,对吧,它跟一些关键字有冲突,那我也可以在后边直接追加一个字段,比方说我这个叫RT.ro type,那它就是当前这个RT,是不是就是我当前的这个事件时间啊,现在大家再执行一下的话。可以看到我现在是不是又应该追加了一个字段变成四个字段了。而且大家看到现在。上面这个TS是不是还是原来的样子,对吧,还是啊,还是一个长整形,然后现在追加了一个RT,它是一个row type是一个这个。Time sam3的类型,所以大家看前面这个数据是不是还是原封不动的样子啊,这个长得跟我们之前那个pro time的那个输出形式是差不多的啊,只不过这是当当时的我们给的那个事件时间,对吧,这不是当前的机器系统时间了。
06:07
这就是关于这个row time的一个定义,这种方式是最简单的,大家也最容易理解。那当然了,与之对应还应该有两种方式,就是可以在定义connect连接到外部系统的时候,直接在表的那个STEM里面指定啊,那之前我们是直接点pro time就可以,那现在是不是直接在某一个字段后面点row time就可以啊,但是大家注意row time是不是就没那么简单了?里边是不是必须你得告诉我,对时间戳,因为现在是不是没有我们那个流提取时间戳和生成automark的那个过程了,所有这个过程都得自己定义了,对吧?哎,那大家看到这里边就是我去里边要new一个row time,然后指定点time step from field这啥意思,是不是从字段里边属性里边提取提取当前的时间戳字段对吧?时间戳属性,然后后面是点water mark periodic bonded啥意思?
07:04
是不是,哎,这就是我们说的是周期性的生成water mark,然后按照一个这么长时间的一个延迟时间来生成,对吧?诶,我这里设置了1000的话,当然就是一秒钟自动生成周期性的生成一次water mark啊,这就是在这个skima里面的定义啊,相当于把我们前面的那个asign time Sam和water mark也也实现了一下,是吧?啊就放在这儿了啊,那另外还有就是如果在DDL里边直接写可以吗?也是可以的,只不过这种写法呢,就会更诡异一些,大家可以看一下啊,这里边我们的写法还是创建表的时候,前三个字段idts,还有temperature,然后接下来呢,追加一个字段叫RTRT呢,As之前我们那个直接就那个time了,对吧?呃,直接就是as pro time啊,这里边还不能,这里边我是要做一个two time stamp,这要干什么呢?
08:01
其实本质是不是就是要从这个里边要提取时间戳二,所以大家看最终我的那个时间字段要的是一个time stepmp3类型,对吧?所以这里边我必须最后是一个time stamp,然后这个转换怎么转呢?大家看这个就稍微麻烦一点啊,我是先TSTS,大家知道这是一个长整形的数码,它还不能直接two time step没没办法转对吧?我是需要先把它转换成一个unix time,就是把这个当成一个标准时间的,呃,当当前的那个毫秒数时间戳,然后转换成当前的UTC标准时间对吧?所以这里得到的是一个CQ里边的time类型,然后再把这个time类型转换成time stamp CQ里边time sam3类型,对吧?哎,就做了这样的两重转换,当然大家知道这样的调用的话,这也是计算列,是不是也必须是blink才支持啊,啊,这个是没问题的啊,然后除了这个之外还没完。
09:02
还得有watermark的呀,呃,Watermark怎么定义呢?Watermark for rt,大家看这是不是就是指定当前,以它作为时间戳对吧?以它为基准,然后生成watermark watermark for rt,然后as as是不是就是表示当前生成watermark的一个机制啊?那基于什么来生成呢?大家看基于当前RT减去INTERVAL1SECOND,是不是一秒延迟时间生成watermark呀?哎,这就是这个CQ里边的这个写法啊,这里边的这个interval,然后单引号1SECOND,这就是固定的在CQ里边表示一秒这样的一个时间间隔的固定写法,对吧,就是一秒就这么来写,那如果要是一分钟,大家能想到怎么写吧,还是一样,In t赢,赢起来1MINUTE对吧,那就这么去写啊啊有同学说啊,那in t赢起来60SECOND啊,当是那个也可以,对吧?大家知道是怎么写就可以了啊这就是我们用不同的方法可以定义出事件事件。
我来说两句