00:00
除了在创建表的时候,直接在DL当中去明当前的时间属性,另外还有一种方式呢,是直接在数据转换成一张表的时候,单独去做一个定义啊,那所以这种方式就要求我们先要创建流式的执行环境,然后先得到一个data stream,接下来呢,我们要调用from data stream方法,当然也可以是fromlo stream啊,然后创建一个表的时候追加参数,指定当前表里边的时间字段,时间属性。啊,那怎么样去指定当前的时间属性呢?呃,这里边如果我们定义的是事件时间的话,那把对应的我们想要指定成时间属性的那个字段后边直接加一个点row,也就是调用它的点row time方法就相当于。把当前的TS这个字段指定成了时间,而且是事件时间下的时间。
01:03
方式呢,需要注意的是,我们可以因为之前我们说过,在from data stream后边可以只传当前的数据流,后边可以不指定任何的字段,也可以后边跟上对应的字段名称,用Dollar符,呃,这种expression啊,这种表达式的方式来指定我们表里边的每一个字段,那这里呢,可以后边这个TS就是我们当前这个流里边的某一个属性,哎,那这样的话。当前这个流里边我们是po po类型,或者有可能是其他的一些类型,假如说我们对应的是有这个字段的话,那点row time,那就相当于直接把这个字段当成时间属性,那假如说没有呢。这里我们整个这个流里边的本身的数据类型是二元组,就只有两个string,那就是一个user,一个URL,哎,那如果要是这样的话,那相当于我们在这里直接TS.ro time也是可以的,这表示什么呢?表示额外的去创建一个新的字段,这个字段就是当前时间。
02:16
事件时间语义下的时间属性字段,那我们可能会有点奇怪,这不对呀,这当前流里边的每一条数据只有user和URL2个属性两个元素啊,那当前就是一个二元组,你哪来的第三个元素去指定成当前的时间戳呢?要注意,我们当前进行这样的一个定义的前提是。必须在前面就应该。定义好了时间戳的提取方法和水位线的声明方法,所以我们看这里,看起来这个方法比较简单,连水位线的延迟时间都没有,诶,那水位线怎么生成呢?
03:02
水位线和时间的提取都在我们前面这个里边,It stream里边就应该调用STEM and方法就应该已经全部定义好。啊,所以这里边需要注意就是我们当前相当于只是在之前已经定义好的基础上,把对应的时间戳拿出来,单独又创建了一个逻辑上的时间属性字段而已。而并不是重新去做了一个定义。啊,那当然了,在data stream里边是没有所谓的时区的概念啊,所以使用这种方法定义时间属性字段的话,那所有的时间都是time stamp类型啊,然后这个值就会时间值就会被当做UTC的标准时间来使用。那所以两种方式,一种就是。数据流里边本身不含着当前我们要声明的对应的这个时间属性字段啊,那我们是先要做。
04:04
时间戳的提取,Auto的声明定义好了之后,在流定义好了后,接下来我新创建一个单独的字段,指定它是当前的事件时间与一下的时间属性,那或者呢啊,就是之前我们有某一个字段就叫做TS,现在我们把对应的这个字段第三个字段直接当成当前的时间属性,用这种方式的话,这相当于是把我们的这个长整型的第三个字段。把它重命名为TS,而且把它的类型直接就转成time类型了,它就不再是长整型了,所以要注意,这相当于是覆盖了这个字段的定义。我们接下来可以在代码里边也来看一下它的具体的实现方式。啊,那第二种方式,这就是在流转换成表table的时候。定义时间属性。
05:02
所以这个前提是我们首先要啊,从之前的这个env里边先去创建一个,创建一个流,所以要去source。那这个过程其实跟我们之前讲的过程是一样的啊,这里就还是为了简单起见,我们不再去自定义一些呃,比较比较复杂的一些属性字段了,呃。我们这里就不再去自定义一些测试数据了啊,直接可以用之前的click source啊,那当然了,后边必须要有。因为当前本身在这里。不涉及到太多的时间的延迟嘛,我们本来就是升序。直接是生成的,都是升序数据按照时间去排列的,然后接下来啊,里边当然还应该有一个时间戳的提取方法啊,这个必须要有。
06:00
你有一个Z啊,那当然了,这里我们应该在前面定义一下当前的类型。那后边这里也是event的类型。接下来要实现的就是怎么样去提取时间戳的方法了啊,当然这个我们都轻车熟路,就是element点把它拿出来就可以了。把这个都得到了之后,这就是我们当前的这个click stream,或者叫event stream先得到。接下来我们当然就可以把这个转换成表了,那接下来我们是需要把当前的click stream要转换成table的话,那需要调用。Table。我们所要定义的所有的表里面的列字段了,啊,那当然当前我们需要的还是直接写一个表达式expression。
07:00
首先第一个啊,我们把对应的这个呃,API。table.api下边的这个引入进来,首先我们按照对应的字段顺序,首先是一个user,然后后边。是URL。那当然了,接下来就是时间字段了。首先我们想到应该是一个time step。啊,但是如果说我们还想在当前的所有数据里边保存着这个长整型的time STEM的话,哎,那后边我们就不应该直接把它定义成肉态,而是单独的再去创建一个新的字段。比方说这个字段,我们还是直接叫做e time。它直接去调用一个就可以啊,所以我们要注意前面在定义流的过程当中,我们通过这样一个time stamp and water maps这样一个方法啊,Extra extract timemp里边其实已经提取出了当前的时间戳,通过这个方法已经可以得到时间戳了,在这里只是相当于把这个时间戳又拿出来,单独的定义了一个当前的时间属性字段,叫做ET而已。
08:16
所以跟前面的time可以没有任何的关系。好,那当然了,我们知道在呃,真正的fliq里边,这个time stamp最好还是不要叫这个名字,我们可以把它做一个重命名,如果真的想要这个字段的话,我们把它重命名成TS,这样就可以了。这样得到的就是一个table,我们可以把它叫做table。这就是我们完整的一个处理的过程啊。那当然了,如果说我们想看一下当前这张表里的结构的话。我们知道。Q,提供了一个叫做print这样的一个方法,那我们可以直接运行一下,看一下当前这表它的结构是什么样,把它打出来。
09:00
就相当于是我们看一下当前这个表字段,每一个列字段的声明,它的类型是什么?我们可以看到user urlts,还有一个ET,所以它并没有覆盖,前面的TSTS是一个类型,而后面的ET呢,是一个TIME3类型,而且它后面还有一个标注,这是一个肉碳当前事件时间语义下的时间属性资格。知道了事件时间与一下怎么样去定义当前的时间属性字段,那另外一种时间语义处理时间就更加简单了,因为我们知道处理时间它本身就是系统时间嘛,根本不需要做时间戳的提取和生成水位线啊,那这个过程相当于我们就是额外声明一个字段,然后保存当前的系统时间就可以了。啊,当然类似的也有两种定义方式,一种是在DD里边定义,那DD里面定义的方式就是直接多增新增一个字段TS,然后后面只要as pro time就可以了。啊,那pro time我们当然知道,就是processing time的一个缩写,它其实是一个函数,这里需要注意,当前的这种定义的方式其实是以计算列的形式。
10:18
定义出来的,什么叫做计算列呢?这是fli CQ里边引入的一个比较特殊的概念,就是说用一个as语句。其实我们看前面。我们提到。事件时间语义定义。时间属性字段的时候,相当于也是用到了as这样的一个定义,只不过是在定义w map的时候有as,所以as其实表示的就是一个用后边的计算结果。来得到一个原本不存在的列,这样的一个过程,呃,那在这个过程当中就可以利用原来已经有的列啊,或者像我们这里。处理时间这里已经有的,呃,啊,当然这里面没有涉及到已经有的列啊,只是用到了系统内置的一些函数,当然还可以用到各种运算符,所有的这些都可以结合起来使用,然后相当于有一个表达式,得到一个最终的值,那这个值呢,接下来就新增一个字段,添加到当前的。
11:23
表的定义当中啊,那在这个过程当中,其实就是新增了一个时间属性字段,这是当前的处理时间与一下的时间属性。那另外还有一种方式,当然就是在数据流转换成表的时候去定义了,这个过程也非常简单,也是额外新增一个字段,只不过这个时候后边加的是不是time,而是pro time,这样的话,调用这个方法新增的TS就是当前的事件时间余下的时间属性字段。
我来说两句