00:00
下来我们来讲一讲弗link里边比较特殊的这个特点啊和这个性质,比方说这个时间特性,时间特性啊,之前我们讲过了啊,在flink里边有各种各样不同的时间语义啊,所以说对于这个不同的时间语义而言,后边的操作啊,跟时间相关的操作,那其实就会基于不同的这个,呃,相当于这个定义的时间字段,对吧?那对于之前我们讲到的dgam API而言呢,只要在全局,在外边我们那个环境里边设置一下实现语翼,然后假如说你用到的这个事件时间的话,我们需要在后边再有一步,这个基于数据的一个分配时间戳和water mark,对吧?做了这两步操作的话,后边我们就可以正常用这个,呃,所有的时间表达,那事件时间里边都是以这个water water mark为准的,当时我们是这么去说的啊,那对于这个表操作而言。
01:00
Table API,或者说我们直接写CQ而言,这里边用到的这个时间怎么样去定义呢?呃,有同学可能想那还是一样吧,你直接在外面那个呃,定义一个呃,时间语义对吧?呃,当然大家想到这个肯定是要先去定义的,然后另外呢,呃,之前我们是那个3TIME STEM watermarks,那现在怎么办呢?啊,有同学说,那你还是time ands嘛。但是大家注意啊,那种方式我们是基于流去做的一个转换操作,是基于data stream的,所以当然可以用这种方式了啊,你用这种呃,Time盘的X的话,相当于在事件时间语义下指定了当前data stream流里边的时间字段。啊,那你如果要是做了这种操作后边把它转换成表,那是不是还应该在指定一下表里边的那个字段啊,因为大家注意啊,流里边的那个时间字段,我们知道指定之后,它是以watermark的形式,类似于插入到数据流里边的一种特殊数据。
02:02
那你做表操作的时候,他又不处理watermark对吧?表我只关心这个数据来了一条之后,我这个动态动态表怎么关,怎么去做转换,所以对于这个表处理而言,我们要另外的设置一个字段,专门用来表示当前的时间啊,或者说时间戳对吧?呃,而不是像这个流处理里边,相当于我直接附加了一个字段,直接加入到流里边去做操作了。表里边是你要明确定义出来的。所以呃,这里边表给我们提供了这样两种不同的方式啊,一种是我可以是就是就是这个给一个逻辑上的时间字段啊,就是这个一种方式是可以是当前表里边已经有的一个属性,已经有的一个字段啊,然后我指定当前这个字段就是呃,我要用的比方说事件时间或者是处理时间的那个字段。
03:00
另外还有一种什么方式呢?因为它本身是逻辑上的时间字段嘛,我还可以单独的再去新增加,创建一个新的字段,表示这是当前的时间戳。啊,所以这种方式跟之前这个旅游处理里边定义的方式呢,可能稍微有一点不一样啊,但是呢,整体来讲也还是比较简单的,大家把它当成一个表里边的这个特殊的字段就可以了。啊,所以说这个时间属性,它在我们使用的过程当中,它应该是每个表STEM的一部分啊,你要后面如果要用到它的话,就必须把它加入到表里面去。如果要定义之后,它就相当于是一个字段了,那如果说呃,这个它的类型是什么呢?它本身这个类型是就是time stamp类型,对吧,你定义了之后呢,他的行为类似于常规的时间戳,可以访问,而且可以进行计算,但是注意啊,如果对他进行了转换计算之后,它就只有常规时间戳的那个属性了,就没有我们后边的这个时间相关的属性了。
04:01
啊,所以这个大家稍微了解一下啊,稍微这个知道就可以,重点我们还是要看后边它到底怎么用对吧?呃,这里边最简单的一种方式,其实就是。就是把按按照我们之前的那种方式啊,先定义一个data stream,然后呢,基于data stream,我们去做assign time stamp and waters,这样的话就有了这个对应的时间戳和water嘛,然后呢,后边直接把它转换成一个流,然后再呃,把这个流转换成一个表,转换成表的过程当中指定一个新的字段,表示当前的事件时间或者是处理时间就可以了。所以大家看这个写法其实非常简单啊,前面就还是那个呃流的操作啊,把流那个呃转换过来,然后map成样一类,呃,我们做一个这个分配时间戳和watermark,后边呢,还是把流from data streams转换成表,转换的过程当中前面字段还是对应对吧,后边再追加一个,比方说我叫PT这个字段,新的字段,然后怎么样呢?点pro time就是表示processing time对吧。
05:08
这这个点pro time就是表示我追加的这个字段,它当前是这张表里边的处理时间,时间属性啊,多了这样的一个属性,这个是最简单的一种应用方式,那我们在代码里边可以先看一看啊,大家看一下这个代码到底应该怎么做好,我们新建一个。Object,这个就可以叫做time。啊,当然大家知道,就是关于time的这个应用,后边肯定要是为了方便做时间管理,呃,主要是要做这个窗口操作嘛,所以我们这个是放在一起写吧,这个就叫time and window test,对吧。好,然后里边内函数首先啊,要引入的东西其实都是一模一样啊,首先我们把那个环境先引入对吧?呃,当前把这个流处理的环境和这个表环境通通先引入。
06:06
这里边我们需要的这个影视类型转换啊,下划线两个都引入,然后这里边既然是要做这个事件时间,比方说啊,要做时间语义的定义默认,如果我们什么都不配的话,当然是处理时间了,如果现在我们以这个事件时间为例啊,如果要做事件时间的话怎么办呢?当然在外边先得把这个时间语义要定义出来,时间特性要定义出来。Time character even time啊。这个还是一样的啊,先把它定义出来,然后后边我们已经做了这个创建表环境,做完之后呢,按照这个最简单的方式还是要读取数据源,先把它读成一条流,然后做map转换成样例类,然后定义对应的,呃,就是我们想要做的那个分配时间出和watermark的方式,对吧?啊事件时间语义嘛,所以接下来这个操作跟大家之前熟悉的过程是差不多的,我这里还是直接用这个example这里啊。
07:07
大家看呃,从文件读取数据对吧,Input stream,然后map成样一类类型,截止到这里为止。把这个sense reading样衣类类型,先把这个获取出来啊,然后后边直接去分配时间戳和water mark,然后如果事件时间大家如果是乱序数据的话,你可以去new一个,还记得那个类吗?Bounded of bounded out of orderness对吧,Time stamp instructor啊,然后同样下边再去写这个处理的过程当中还是啊,按照我们之前的是什么就提取什么啊,时间戳提取出来,然后它如果是秒数的话,我们要的是一个毫秒乘以1000,这里边你可以给一个。就是延迟触发的那个时间对吧?Time这里边引入window in time.time然后比方说我给个一秒吧,简单一点啊,Time赛子一这样的话就完成了,这是我们之前这个对于流的这个操作,然后后边呢,得到一个流之后,呃,那接下来需要把这个流转换成表对吧。
08:11
所以接下来这个啊。将。流转。换成表,然后直接定义时间字段。那当然这里边时间字段就是可以是处理时间,也可以是就是even time事件时间啊,那所以这里面我们定一个还是sensor table啊。它是一个table类型。把这个table类型引入这个我们用flink table api.table啊,然后这里边用这个en对吧,点from data stream,然后里边我们要传的就是当前的这个data stream,注意后边哎,指定当前的这个字段,比方说有这个ID对吧,我可以基于这个呃,本身的这个数据类型来来做一个本身的这个字段名称,做一个这个指定啊,然后后边我可以有这个temperature对吧,Temperature,然后后边还可以有这个time stamp。
09:13
Time。哎,然后大家看这个该有的字段我都已经列在这儿了,然后我可以再追加一个,比方说我现在要这个处理时间语义,那大家会想到处理时间在当前,其实跟这个时间戳就没关系,对吧,跟这里边提取,你如果处理时间的话,我我也不用去指定这个时间语义默认就是嘛,后边也不用去分配时间戳,这些其实都不必要啊,然后这里边指定处理时间的时候,直接大家看本来没有的字段,我直接定义出来,这就相当于新增一个字段,然后后边处理时间的话要点。要把这个写入,那后面在这个表里边啊,大家可以把这个就是后面这个表类型再打出来,对吧,大家知道这个可以直接to stream呃,然后我们这个一个肉类型。
10:00
把这个引入。Flink types roll,然后直接把它做一个这个print,呃,这里边我们可以直接把这个数据打印输出,当然了也可以怎么样呢?我直接可以这个有一个方法给大家说一下啊,可以直接print STEM啊,这就相当于是什么?就是可以直接把当前的这个table的架构啊,Stemma直接输出出来,大家看一看它当前是一个什么样的表,对吧?啊,然后我们把这个该写的写入进来啊,这个当前是time and window test。Job,然后我接下来这个运行相当于没有任何的处理操作,对吧,只是不做输出,只是把这个,呃,当前的这一个表,环境表的这个架构啊,要做一个打印,看一看这个当前它变成了一个什么样子。好,大家这里可以看一下当前的这个表的架构是什么样的,哎,表的架构其实就是有四个字段对吧?啊,然后本身这个ID temperature time,这就是之前我们在样例类里边已经定义好的那个类型啊,String double,然后长整型换成一个big int,那最后还多了一个PT字段,它是什么呢?哎,它是一个time stepmp3对吧,而且大家看它有一个特殊的含义是proty,呃,这是一个关键字啊,就是在我们这个table API里边的一个关键字,它指定当前这个字段是时间属性,是处理时间。
11:29
啊,这个比较特殊一点啊,好啊,那这里边给大家呃,简单的看了一下这个处理时间的定义,那大家比较关心的可能还有就是这个事件时间对吧,那我们看一下这个事件时间,前面已经定义了这么多了,那接下来怎么怎么去定义事件时间呢?啊,这个其实也简单。大家看就是前面我不是已经知道了现在的时间与意识事件时间,而且也已经分配了时间戳,知道了这个water mark是什么吗?呃,那接下来我这里就直接可以定义一个,比方说事件时间啊,这个even time对吧,我直接给一个ET,然后后边注意给的这个字段就必须得是。
12:10
呃,不能叫这个pro time了,对吧,大家可能想到那是不是叫even time呢?没有这个没有这个,呃,关键字啊,它叫row time,就是我们当当前的行的这个事件。你如果这么一定义的话,那表示追加的这个字段就是一个事件,事件,这个事件时间从哪里来?哎,就从我们之前定义好的这个,就当前的那个时间戳和watermark里边指定对吧?啊,当前的那个时间戳我们不是已经指定怎么提取了吗?那这个方法就是告诉我们按照之前定义好的那个多追加一个ET字段就可以了。那有些有些同学可能也就看到了,那我何必这么麻烦呢,之前的这个字段我本来就有time stamp,那那我直接用这个行不行呢。这里大家稍微注意一下,这里边我们的这个time stamp跟我们真正的这个就是even time,这个时间其实不一样,为什么呢?因为even time我们这里边还做两步转换操作,因为这里之前本身它有可能是一个,呃,就是有可能是一个这个。
13:16
秒做单位的一个长整型,我们现在有可能成了个1000,对吧,这个数字有可能变了,另外呢,呃,当前这个还有可能就是加了一个那个英文time的一个watermark延迟,这两个你本身在原先的这个字段上是体现不出来的,对吧?所以说后边你要做这个时间字段指定的时候,如果用事件时间的话,那不能直接用我们当前这个time STEM这个字段啊,就必须得定义这个row time对吧?把某个字段指定成row time,这个才是用我们之前得到的那个事件事件。但是有同学可能就会感觉你这就这就time STEM这就就多了这么一个东西,好像就没意义了嘛,对吧?嗯,你这相当于他俩是指的是一回事嘛,啊那所以当然你可以就是说在做这个操作的时候,可以把它直接去掉,对吧?啊可以把它去掉,但是这里大家要注意啊,你这里边如果要去掉的话,你得注意啊,就相当于。
14:11
我们这个对应的字段,你得让他不能以这个就是就相当于你必须是以这个名称,如果是基于名称来做对应的话,哎,那后边这个字段它可能就。就就不知道它是什么了,对吧,那你如果要是基于位置去做对应的话,那后边相当于我们这里边这又不是追加的,这是默认的你那个第三个那个呃字段那个元素了,而且之前那个位置大家知道我们是time STEM是在前面的嘛。啊,那如果说我直接就想直接用一个字段定义啊,不要有这个两两个都写在这里的这个形式怎么样写呢?啊,有另外一种简单写写的方式,我直接可以,哎基于之前的那个时间字段,直接点pro time,呃点点那个row time,现在是事件时间啊,直接点row time就可以了,这是表示什么呢?哎,就是用之前本身time Sam那个字段,现在要把它的值改成什么呢?改成我们当前的那个事件时间定义好的那个值,然后啊基于它。
15:17
这个定义它是一个时间属性对吧,在表里边它就有一个特殊的这个row time这个属性,后面我们涉及到的事件时间都是它啊,所以往往大家看这个代码里边容易这么去定义啊,因为它本来就是一个字段嘛。那另外就是说还可以去重命名对吧,比方说我觉得这个。这个time Sam这个名字不好,因为后面那个呃,CQ里边大家知道time Sam本身是一个关键字,所以我可能给它做一个重命名,那之后我的这个时间字段就叫什么了,就叫TS了啊。它是我们基于前面的这个STEM waters提炼出来的那个事件时间,诶,给到了这个字段叫做TS。这就是呃,最简单的一种定义这个事件时间语义的方式啊,就转换成流,然后按照我们之前熟悉的这种方式把它先定义出来,在这儿去指定一下就完事了。
16:11
哦,那呃,这种方式是最简单,大家会用其实也就够了,但有些同学可能觉得那这种方式还是依赖于就是前面我们这个流的转换嘛,对吧,那假如说我之前这个数据读取的时候,我没有用先转换成流的这种读取方式,我有可能就是直接在这里边用了一个这个connect,对吧,然后直接把这个连接外部系统读进来之后,直接注册,注册成表了。那这个读进来之后,我怎么去定义,呃,当前的这个时间字段呢。哎,这里边给大家再讲讲其他的一些定义方式啊。我们继续往后看啊,那大家看到就是在这个,首先我们这里边有有这么几种方式啊,第一种就是在这个data stream转换成表的时候来指定,对吧,这是最简单也是最舒服的一种方式啊,然后另外还可以怎么样呢?那就是你在定义跟外部系统连接直接注册表的时候,我们当时不是有很多那个sIgMa的定义吗?诶,这里边我们就是相当于把这个表结构啊,里边对应的那个字段都定义好了,所以这个table API提供了一种方式,就是你可以在它后边直接追加一个field。
17:24
就是在指定那个table sIgMa的时候,直接后面跟一个field,比方说我当前就叫PT,对吧,但是你得指定清楚它后边的type,就是它的类型必须得是time step3,然后呢,后面要加一个点pro,对吧,必须要把这个追加进来。啊,所以用这样的方式也可以指定当前这个字段,新增了一个字段,这就是当前的这个,呃,就是处理时间对吧,或者说你如果想用这个事件时间的话,那同样后面就是点row time对吧,用这种方式。啊,这里大家需要注意一下,就是你如果说直接用这种方式去定义啊,在有些场合下边它会报错啊,就是这里边就是说你如果在卡夫卡这里边去去定义的话,这里边我们直接给大家举一下例子啊,比方说这里我直接加一个field。
18:17
这个PT对吧,呃,然后后边我这个data types.time stamp3。然后后边直接定义它是这个啊,大家看有这个对吧,我这里边是pro time啊,直接把它定义出来,这样运行是呃。本身是没有问题的,可以这么去运行它的这个,呃,如果说大家把这个在所有的这个连接器里边都做一个迁移的话,会发现有一些连接器它还是还是不行的,不太好使,比方说什么呢?这里边我们给大家讲的这个文件读取,你在这儿如果要是直接加这个的话,哎,后面运行就会发现它报错。这个原因还是在于就是说,呃,我们当时给大家讲过那个具体它的实现啊,其实是呃,CSV有一个table sce对吧。
19:08
这是那个是think啊,我们现在如果要读取数据的话,这是一个这个table sce,那这里边它实现的这个接口是什么呢?就是string table sce和vegetable sce做这个,呃,做这个流处理和批处理,对吧?哎,那大家再看一下这个。这个卡夫卡的table啊。就看到他俩的这个差距了啊,卡卡table sce base,大家看它除了这个stream table sce之外啊,当然它只有stream,没有批处理的啊,它还实现了什么呢?DeFined。Time。Attribute DeFined time attribute。啊,就是这里边必须我们这里边的这个table sce啊,啊,它这个描述器必须得实现这样的两个接口,就是允须我们定义pro time和row这里边的这个调用。
20:01
在这里边直接点pro time.road time这个才是有意义的,对吧,才不会报错,你如果直接它没有定义这个接口,你在这里边要去做这个,就是我们这里边这个CSV这里要去定义的话,那它就会报错。啊,所以这里边其实就是底层还是看它支持不支持,如果说他要是根本就没有支持,大家看这个interface里面,就是你可以get当前的这个描述器嘛,对吧,如果要是本身当前这个你的这个连接的这个外部系统啊,它就不支持这种方式的话,那你就不能这么写了,你该转流还是得转成流,对吧?啊所以这是给大家稍微的多说一句。啊,然后当然了,还有另外的一种方式,就是DDL啊,大家知道之前那像那个MYS对吧,如果说啊,当然MYSQ现在它并不支持这个S的方式,对吧?啊,那假如说我我之后有这个支持的方式了,或者说我前面这个卡夫卡或者其他的这个源,我就是想在这个DDL里面去去定义,去创建这个表,那怎么办呢?啊,那我这里边可以是怎么样,就是create的时候,后边比方说诶大家看这个定义,就是我直接连接的文件系统。
21:08
大家看这个文件系统也有这个直接用DDL创建的这种方式啊,啊,所以大家不要认为前面我们create table这个就直接是在MYL里边创建表了,不是啊,这是在flink里边创建表,它是后边这个with里边,这是我们连接到的那个外部系统,连接外部系统读取数据,然后对应的在flink里边,按照我们定义的schema把表创建出来。那这里边定义那个时间字段怎么定义呢?哎,你可以这里边给一个PT as pro time对吧,来追加一个这个呃字段啊,然后我们把它指定成as time,这就是一个这个呃,给我们底层实现的一个方法,在这个flink CQ里边一个方法as pro time就指定了当前这个字段是一个处理时间。啊,这样的话后面就可以直接用了。啊,那当然了,如果要是这个事件时间的话,那就得是RO对吧,但是rowtime相对来讲又复杂一点,为什么呢?那大家想到你这个还得有water mark呀,啊,所以呢,Water mark怎么样去定义呢?
22:12
哎,我们实践时间,呃,给大家再详细的说一说,这个怎么定义啊,第一种方法还是前面给大家说过啊,非常简单,直接用RO time对吧,你可以在后面追加一个字段也可以呢,直接把之前我们认为时间戳的那个字段直接给它点rowtime定出来就完事了啊这个是最简单的方式,那如果说要是用这个tables game里边去指定,那怎么指定呢?诶注意这里边就是直接对于某一个字段field啊,后边直接点roll time,这个还是必须他得支持才行,要不支持的话你不能掉这个肉ta。啊,那这个time里边传什么呢?之前我们看那个pro time就啥都不用传,对吧,因为它系统时间嘛,Row time的话,里边要去传一个new,一个row time啊,然后row time后面要去点timetemp from field,然后指定哪个字段,诶大家想这为什么我要去这么定义呢?
23:05
哎,这不就相当于你如果用这个tablechema的方式去定义,是不是就相当于我们前面没有那个流的转换过程了,哎,之前在代码里边,我们这里边之所以不用定义那么多啊,之所以这里边非常简单,直接就把它这个rowtime定义出来了,那是因为我前边的这个事件时间的提取,我已经知道它到底是用哪个字段对吧,做什么转换,然后呢,呃,这里边去,呃automark去延迟多长时间,这个我们在流里边用这个方法都已经定义好了,所以这里边你只要RO time,哎,这个把当时的那个事件时间给到这个字段上就可以了。操作非常简单,那这里边你如果直接connect之后,直接注册表。不做流的这个转换,那当然就没有前面这一步了,那所以先接下来就相当于这一步做的事儿我们都要完成一遍啊,那怎么完成呢?也简单,指定一个时间字段对吧?Time STEM指定时间字段,但是这里大家注意啊,这个指定之后,这个看起来好像没有这个做转换的操作对吧?哎,所以说你这里边可能指定的那个时间字段,你你当时去去我们做提取之前你就得先。
24:15
先做一个map对吧,类似于得先做这样的一个ETL啊,先把它转换好,然后后边这里啊,还得指定water呢,延迟呢,对吧?这里面大家看就是点r water marks period,呃,Periodic bonded,对吧?周期性生成water mark,然后设置一个就是最大的迟到时间啊,允许延迟的这个时间,那这里边给的是一个毫秒数,比方说延迟一秒给一个1000。啊,所以这个写法跟我们在流里边指定时间字段和这个提取时间戳,呃就是生生成这个water mark这个方法其实是一样的啊,但是他可能就是稍微局限一点,我们这里边没有那么呃在流里边的处理那么灵活而已,你之前得把那个该做的这个操作都ETL好。
25:03
这是这个在tablechema里边的定义,另外还有一种方式就是DD里面定义了啊,DD里面定义这个。呃,这个事件时间啊,Even time就相对来讲更复杂一点,大家看一下这里边的写法是怎么样,因为你这里边既要定义时间字段,然后你还得定义那个water mark对吧?诶还还得定义这个东西,所以这个就比较麻烦,这里边还有一个问题是什么呢?就是我们一开始传入进来的,比方说我这里边有一个TS啊,这个time stamp字段。它本身是个长整型。对吧,是个big int类型,那我们在这个CQ里边啊,在flink CQ里边,它本身要的那个时间戳又是什么类型呢?又得是一个time stem3,对吧,大家记记得之前我们那个要求的那个类型,那这里边它本身是个长整型,那我当然就不能直接把它指定成就是呃,这个。就是as一个一个一个什么什么东西,对吧,所以这里面大家注意啊,我们最后要的那个事件时间,其实是就是water mark water就是事件时间嘛。
26:07
之前我们是直接点row time,或者是怎么样直接把它指定了,这里边不能直接指定,还要先把这个TS字段做一个转换啊,比方说这里边大家看一下这个转换的过程是怎么怎么转的啊,这里边用的是首先啊from,呃,Unix time对吧,首先从这个标准时间戳,我们把这个先转换成一个,就当前CQ里边的,呃,就是这个。时间字段对吧,就是一个这个年月年月日十分秒的一个标准的这个CQ的这个时间类型,然后接下来呢,再把它转换成一个标准的time stamp类型,做了两次转换,对吧,从一个时间戳转换成时间,然后再转换成呃,这个time stamp类型就从一个长整型啊提取出来转换成时间。呃,接下来这个叫做RT,就是RO对吧,简写叫做一个这个新的字段,然后这个新的字段呢,基于它再去做。
27:07
操作来定义这个延迟时间,定义一个water mark,那大家看这个water mark是怎么定义的呢?呃,Water mark for real time as。RT对吧,这里边就是说我们这里边真正定义的这个字段,其实就是这个roadtime嘛,RT嘛,而这里边我们要呃这个再再把这个water mark定义出来,它对于我们这个字段的watermark是什么呢?啊,For rt as rt减去。这个INTERVAL1秒钟对吧,一秒钟的间隔啊,所以这里边就是有了这么一个比较复杂的一个语法定义,大家要搞清楚,这里边就是怎么样去做的这个操作啊,指定当前的时间字段,而且要指定这个water mark延迟多长时间。啊,就是这样的一个操作。好,这个就我们不在代码里边给大家具体去实现了,感兴趣的同学可以下来之后,呃,对应的在代码里边把这个呃,每一步啊都做一个操作。
28:10
这就是这个时间语义的定义。
我来说两句