00:00
与之对应的,我们已经知道怎么定义处理时间了啊,那当然接下来我们就可以设置事件时间语义,然后去配置事件时间,那其实事件时间我们看到也差不多对吧?呃,这里边其实就是我们需要,如果说啊,我们想要去处理这个乱序数据,处理这个迟到事件的时候,我们需要哎,定义成事件时间语义,然后接下来呢,这个时间就不再是由任务直接从机器本地的这个呃,运行时间里面去提取了,而是要从数据里边提取时间戳来表示我们当前事件时间的进展,那同样定义它的话有三种方法,跟前面对应也是三种,有一种最简单的,哎,由这个data stream转换的时候,转换成表的时候去指定,对吧?另外还有两种就是我们直接连接外部系统创建表的时候,一种方法是我们在connect的方法里边那个描述器定义的时候,它的那个table sIgMa里边去指定。另外呢,就是在创建表的。
01:00
BDL,我们给定他的那个STEM的时候去指定对吧?这两种方法都是可以的,那接下来我们依次给大家做一个讲解啊,那首先就是转换成流的这种方式,转换成流的这种方式特别的简单,大家看这里边怎么样呢?直接就是我们之前不是应该有一个时间戳吗?对吧?有一个时间戳,所以如果你想把它直接作为当前的这个,呃,就是我我我们当前的这个时间字段的话,你就可以直接定义它,点row time,直接可以把它定义出来,或者还可以怎么样呢?也可以在后边追加一个字段RT,对吧?这个代表row time的意思啊,注意后面调的是点row time,不是1TIME啊,点row time就是当前这一行数据的时间,对吧?因为我们后面做表操作的时候,一条数据都是一行嘛,所以这里边定义的就是啊,之前那个是pro对吧,这里是你可以去追加一个时间特性,专门表示这个也可以。
02:00
哎,直接就用之前的这个time Sam某一个字段指定它是当前的这个时间字段啊,这是关于这个data stream里边直接指定这种方式啊,我们就不在代码里边给大家再详细说了,那另外还有一种方式呢,是哎,就是在定定义这个table schema的时候,我们这里边可以给对于某一个field的一个特性一个字段后边加上点,哎,大家看这个点row time就不像那个点pro time那么简单了,因为我们说处理时间的话,你直接告诉说它当前是处理时间,那我们底层直接获取机器时间就完了嘛,那这个row time呢,是不是你还得告诉我到底它的那个时间字段从哪里来啊?诶,这里边大家就看到了,里边我就得去new一个row time,这里边得有这样的一个就是数据类型啊,就叫做row time,作为当前这个row time方法的一个参数啊,所以你看这个就很复杂对吧,你如果要是我们本身。
03:00
那当前的这个script没有实现对应的接口的话,他肯定处理不了这样的东西啊,对吧?这是代表我们要怎么样去提取,从某一个字段里面提取当前的时间戳啊,大家看这个row time怎么样提取呢?Time step from field对吧?从某一个字段这个域里边去提取时间戳,哪个字段呢?Time stamp,对吧?从这个字段里边去提取,然后接下来还得指定,既然是这个事件时间嘛,大家想到是不是还得指定mark呀,因为当前我们这里边你直接就从连接外部系统直接去读取数据了,我们现在没有时间戳的定义,没有watermark,所以说这些都得我们自己来配,我指定了这个从哪里去取这个时间,然后后边,诶,这里边再定义一个这个water mark periodic bonded,大家可以看这个字面的意思,就是说什么,就是周期性的生成water mark,然后有一个延迟时间,对吧,延迟的这个。
04:00
界限,我们这里边给的就是那个类似于我们呃,Data stream里边配置water的时候一个好秒数,对吧,这就相当于延迟一秒生成这个water mark周期性的生成,好,然后你看我这是插在中间给了一个这个就是当前的这个time stamp,我我把它定义成当前这个roadtime了,对吧?啊,后面还有这个temperature,这是这样的定义,当然我也可以单独再去专门的定义一个这个呃,RT对吧,一个field就叫RT,这个也是完全可以的,因为你看到它其实跟当前这个time Sam没关系,对吧,你看这里边RO time它它干什么,它是单独再去提取这个时间戳的,你是在这里边才指定我到底要怎么样去提取这个时间,所以说并不是说你这里边基于哪个field把它指定成时间字段,它就是时间字段了,对吧,不是这样的啊,不是基于它去做的,相当于我们是提取这个时间字段之后生成,然后把这个字段给覆盖了,相当于这样的一个做法,对吧,还叫了。
05:00
个字段的名字而已,那有同学可能就会觉得疑惑,那之前我们为什么这个转换成硫之后就没有watermark的那个定义了呢?也没有去指定到底从哪里去提取呢?啊,难道说我这里边呃点这就是直接从里面去提取吗?不是这样的啊啊,这个我们还是回到代码里边,再结合代码给大家看一眼,就是如果在代码里边我们直接用这个流式转换的这种方式来定义这个事件时间的话,那大家想这里边我就需要把这个全局的事件时间设成呃,当前的这个时间语义设成事件时间,然后后边呢?诶,大家记得是不是我在这个读取数据源之后,是不是就应该要有一步操作,是ign time stamps and watermarks呀,对吧?哎,所以大家看并不是说我在后边不去提取,而是什么在前边我做这个流转换的时候,Time stamp时间戳和。
06:00
Automark都已经在流里边已经有了,对吧?哎,所以接下来我就只要指定一下我现在的这个事件时间字段是啥就可以了,那当前的这个,比方说我们前面还是用这个乱序的,大家还记得吧,Bounded out of orderne啊,Time step except对吧?这里边我们给一个当前的这个延延迟的时间,我们一般给那个最大乱序程度嘛,比方说我随便给给一个的一秒钟啊,然后后边这里边我们指定的就是怎么样去提取时间戳对吧?啊,比方说我要求他这个乘以1000,这个这个都是完全可以做到的,然后后边假如说啊,我把这个去掉,我直接这个,呃直接点大家看可以直接点row对吧,指定它就是当前的这个时间字段的话,那现在我们这个到底到底它里边是是什么样的东西呢。
07:00
哎,大家可能会想到,你如果要是说我我这里边直接以这个time stamp作为当前的这个row time的话,那会导致一个什么效果呢?呃,我们输出一下给大家看一看这个最终的结果啊,我们就可以知道这个到底这里边做的操作是什么样子了,大家看到这里我们已经运行输出了这里边的结果对吧?哎,你看到现在就是少了一个字段,哎,我们之前不是单独定义了一个字段吗?现在是直接就把这个time step作为当前的,呃,就是我们的这个时间戳了对吧?Time Sam直接输出了啊,然后有同学可能就会想了,如果说我是以这个time Sam作为road time的话,你看它的数据类型是什么?Time sam3对吧,毫秒数啊,那你假如说我们之前大家还记得这个数据吧,这个数据我们说本来这应该指代的是一个秒,你如果把它当成毫秒直接做转换的话,会导致什么?哎,最后我们得到的那个数据就变成了一个1970年附近的一个时间,对不对?诶是不能直接做这个转换的,但是我们看到这里输出的结果它是1970年附近吗?诶,不是啊,这2019年啊,诶所以看起来我们得到的还是正常的,那这里边我们真正的这个时间字段,它里边的值是什么呢?其实还是之前我们已经提取出来的那个时间戳,对吧?之前我们跟大家说过,就是在这一个,呃,所有的数据里边一个一个流进来,然后经过这个asign time and watermark这个处理之后,相当于什么呢?是不是相当于我们就在这个数据上面多加了一个time stamp的这样这样一个标签啊,一个戳啊,对吧?然后基于这个戳,我们后边再去插入那个watermark对吧,哎,是这样的一个处理的过程,所以说现在其实我们直接这个数据来了之后,我就能提取出你之前指定好的那。
08:53
个乘以1000之后的那个,呃,真正的那个毫秒数,对吧?所以这里边我指定这个点的时候呢,还是其实是把我们那个时间戳放拿出来放在这里作为这个来使用了,而这里边的time呢,相当于只是覆盖了之前那个time stamp的值而已,对吧?啊,只是相当于借用了我们这个字段的名称而已啊,就是还是叫这time stamp放在了这啊,大家就是可以把这个东西稍微的再考虑一下,看看这个底层到底是怎么做的啊,所以你看这个通过流市的转换,大家如果已经熟悉之前这种做法之后,后面这里就简单多了,对吧?呃,就不需要再去定义,哎,我到底怎么样去提取这个,呃时间字段怎么样去做这个watermark的定义,前面我们都做完了,如果说你不做这个流式读取的话啊,那就是我们说的,接下来是不是就得自己单独定义了啊,所以接下来你看我们在这个cable Emma里边啊,这里边如果。
09:53
要单独定义的话,那怎么办呢?哎,那就是我们前面说的,你里边定义某一个字段是row time,然后里边要传一个row time对吧?这个row tab里边你再去定义从哪个字段里边去提取,然后后边呢,再去定义这个watermark生成的这个周期对吧?用哪种方式生成water mark,然后这个延迟是多少啊,我们现在是周期性的生成,当然大家一看这个这个调用,那就肯定还有别的生成方式,对吧?啊,感兴趣的同学可以下去之后看一看,它就还有那种类似于我们的间断性生成的这种方式啊那那另外就是说,呃,这里边如果把它定义好了之后,我们在后边做处理的过程当中,所有用到的,呃,当前的这一个时间就全部都是这里的这个事件时间了,对吧?啊,这是关于table sIgMa里面的指定,那还有另外一种方法,前面说过DDL里面也可以去指定这个时间戳,对吧?哎,这个就更麻烦一点,大家看一下这个。
10:53
这个这个定义的过程啊,首先前面字段ID啊,这个就不说了啊,然后有一个time stamp,我们这个叫TS,因为大家知道time stamp本身是,呃,这个CQ里边的关键字嘛,对吧,我们把它定义成TS,然后另外还有一个temperature,这是我们的三个基本字段,先定义在这儿,然后接下来要定义这个事件时间,那是不是你就是首先得有一个时间字段,表示当前是真正的那个就是事件时间,对吧?另外还得有一个whatmark的定义,大家看是怎么定义的呢?哎,首先我定义一个RTRT又是这种as什么的这种表示,对吧?之前我们说那个计算列是pts啊,就是一个pro,那现在这个RT是直接S一个RO不就可以了吗?大家注意不是的,它是怎么样的RTS,我这里边是直接对于这个TS,这不是我们那个时间戳嘛,对吧,然后from unix time,然后把这个参数传进去。
11:53
From unx time,这也是CQ里边的一个函数啊,后面我们会给大家再专门讲函数啊,呃,这个呃也也非常的直白,就是from unx time,就是要把当前的一个一个整数,当前我们这是一个秒数对吧,把它直接转换成一个unix time,就是年月日十分秒的那样一个格式对吧,一个一个字符表达的一个格式,先把它转换成这个,然后再把它转换成two time step,因为大家知道最终我们在CQ里边要的这个数据类型不是长整型,不是big int,而是一个time step,对吧?呃,然后这个转换的过程当中就还得还得二重转换对吧?先转成unix time,然后再把它转换成time step,稍微的绕了一下,这是关于这个RT的这个定义啊,这个字段,这是我们当前时间的一个字段,然后接下来呢,诶,关键在里边,其实我们不是要指示事件时间的时候,我们是需要这个时间出来指示吗?其实不是的,时间戳只是当前的一。
12:53
个字段而已,我们关键是要以这个watermark来指示当前事件时间对吧?哎,那water mark怎么定义呢?哎,大家看,又是这样一个特殊的表达,Water mark for rt啊,就是我要基于这个RT去创建生成watermark对吧?啊,然后as as什么样的一个表达呢?就是RT减去INTERVAL1秒,INTERVAL1秒这是CQ里边的一个写法,就是表示一秒的时间间隔,这是一个完整的表达式啊,这是整体来讲它表示一秒钟,所以它就是说,哎,我现在要基于RT去创建生成water mark,怎么样去生成呢?啊,就是用当前的RT减去一秒钟对吧?啊,就相当于我们延迟一秒生成water mark啊这样的一个机制啊,所以这个稍微有一点奇怪是吧?哎,所以有时候给大家就看大家怎么样去使用了啊叭,较推荐的方式可能。
13:53
排式,呃,如果你要是不熟悉这种,呃,这个DDL里边这种写法的话,那用我们这个connect之后在stemma里边去写,这样可能会表达更加的明确一点,明晰一点,那或者你更加简单粗暴,就是我直接用流对吧,要先放在流里边,后边再转换就完事了,这是关于事件事件的定义。
我来说两句