00:00
那我们再讲另外一个非常重要的概念,那就是时间特性了,啊,大家知道在flink里边时间语义,时间特性其实是一个非常重要的特色,对吧?呃,它跟Spark可能最大的一个差别就是它支持不同的时间与支持事件时间,然后就可以在流出里连续不断来的数据里边保证它的快速处理,呃,低延迟还能够保证结果的正确性,对吧?啊,就可以把这个做一个权衡,那在这个table API里边,我们现在要基于表去做这个操操作了还能够去定义不同的时间特性,还能够做这个,呃,类似于watermark给一个延迟,对吧,然后去处理乱序数据吗?能做到吗?当然是可以的,对吧?啊,所以在我们做这个实际项目的时候,只要基于时间的操作,比方说窗口啊,大家想到那个table API里面是不是也得支持窗口啊啊,那所以基于时间的操作,那都得跟时间语义相关,那么是另外就是你如果用事件时间语义的话,是不是还涉及到一个从数据里面提取时间戳的那个步骤啊,诶这个都是,都是需要大家去考虑的啊,那当然在这个table API里边是支持的啊,它table里边可以给我们提供一个逻辑上的时间字段。
01:19
然后再。处理表处理的这个程序里边,代码里边就可以用这个字段单独表示当前的时间和访问相应的时间戳的那些操作啊,都是用这个来表示。那所以这个时间属性,它其实可以是每个表STEM里边的一部分,就是我们定义的时候,其实这个这个当前时间属性啊,你可以认为就是我们表里面的一个字段,一旦你单独把它定义出,定义成了时间属性,那么后边我们用到时间的地方就都以当前这个时间属性为准。那么它同时呢,也可以作为一个字段引用,呃,那么另外就是说它访问的时候可以就是就相当于是一个常规时间戳一样,但知道这个CQ里边的TIME3类型对吧?那么我们定义了这个时间属性之后,它的类型就是时间戳可以访问,也可以进行计算。
02:11
这个是时间特性的一个基本概念啊,然后给大家简单的说一下,那主要就分成两大类了,我们关心的当然就是processing time和event time,处理时间和事件时间,先看处理时间怎么定义,呃,那在处理时间语义下,大家知道就是机器的本地时间了啊,这是最简单的概念,它不需要我们去提取时间戳,也不需要生成watermark,那是不是只直接只要指定一个字段就可以了啊?所以大家看一下当前我们定义这个处理时间是怎么定义的?这个方式其实特别的简单粗暴。最简单的一种方法就是。啊,来看我这是利用了流对吧,之前我们不是说可以先读成把把这个数据读进来,读成一条流,然后把流转成表吗?大家看流转成表的时候,当时我们是不是from data stream把流传进来,然后指定字段啊。
03:05
在指定字段的时候,我可以直接在后边追加一个字段,然后加上一个点pro time啊,就是pro time,就是process processing time对吧,PC这个是这个缩写啊,前面的几个字母,所以它其实就指定了,我无中生有的直接声明定义了一个字段叫PT,这个PT是啥呢?是当前的处理时间,对吧。所以如果我们这么直接定义的话,就相当于是扩展了当前的物理scheme,就把我们的这个表字段大家看由三个是不是就扩成四个了,就这么简单是吧?啊,这个可以给大家在代码里边稍微的看一下啊。我们还是新建一个测试的class。当前我们这个测试的话,大家大家知道这个单独的测时间语义其实没什么意思,肯定是要结合window一起测,对吧,所以我们就是time and window把这个写出来。
04:10
呃,然后前面的这个过程都一样,对吧,我就直接先把这个环境先创建出来啊呃,Table这里,然后接下来我们想做的操作是什么呢?把这个。Env啊env execute这个执行先执行起来。接下来其实我是想要先把这个啊,先把这个数据先读进来,读成一个流。我们先读入文件数据,呃,得到一个数据流data stream,所以接下来这个操作的,呃,大家会发现是不是跟这个我们一开始这个1EXAMPLE的这种方式差不多啊,先读成一个string类型的流,那接下来是不是我还得转换成这个,呃呃,这个对应的这个po类型啊,Sensor reading对吧?然后后边我才能就是知道里边的那个字段每个叫什么名嘛,你转换成table这个才方便嘛,所以大家会发现我们这种方式方便,主要是因为前面这个流已经帮我们做了很多的操操作对吧?啊,相当于是把这个基本的转换都已经搞定了啊,啊,当然这里边你也可以把这个S就不要了,就叫data stream就完事了。然后这里边我们是。
05:30
读取文件数据,然后第三步转换成一个po,然后下边。第四步就可以将data stream啊,将这个流转换成表,然后这里大家还要注意,我们准备要定义时间特性对吧?好,接下来我们就准备做这件事情,呃,那这里面我其实就是基于这个data stream啊,其实是table env,要去调一个from data stream方法,然后把这个data stream传进来,然后后边是不是我可以直接指定当前的字段啊,那之前我们说那个转换的时候,不就是它可以灵活的指定字段吗?而且还可以给它做重命名,对吧?所以这里边我们指定字段啊,第一个ID啊,第二个是这个time time stamp as呃,TS啊,比方说我给一个这个名字叫TS,另外还有这个temperature as。
06:39
Temp对吧,我直接给一个这个temp名字,然后另外如果说我想要当前的这个处理时间语义当前的这个字段的话,我可以直接定义一个,你就随便叫啊,我是叫PT,你想叫另外叫叫什么T对吧?RT什么这个各种T啊,什么time对吧,只要不跟我们CQ里边那个关键字呃冲突就完事了,PG.pro time对吧,Proc time啊,Processing time,这就是我们当前定义好的处理时间当前的这个字段,那这里边大家看到。
07:13
得到的其实就是一个table,对吧,我可以直接把它叫做data table,当然了,这个table没有任何的,呃,新鲜的东西啊,都是就我们那三个字段嘛,这里面是多了一个当前这个是不是多了一个这个pro time呀,多了一个处理时间,那大家可能会想到,假如说我把当前的这个data table啊。做一个打印输出啊,就给大家看一下当前这个这个状态到底是什么样,那当然我现在要去是不是to stream啊,转成流做一个做一个输出啊,后边我用这个肉点class。把这个肉要引入啊。呃,我们直接用的是flink types对吧,点class。
08:04
做一个print,大家可以看一看当前的这个效果到底是什么样子。好,大家看到现在已经执行完毕,大家看这就是我们当前得到的结果,是不是变成了四个字段呀,后边是不是莫名其妙的直接自己就追加了一个字段呀。对吧,而且就是直接显示我们当前的这个系统实践啊,所以这个其实这就是非常简单的一种使用,这在实际应用的过程当中,大家如果觉得后面那种方法很复杂的话,这种方法其实是最直观,我我个人也是最推荐的一种方法啊,直接这么用就完事了啊,那有同学可能想这个这么奇怪,它是个什么类型呢?当前那个PT到底是个什么类型啊。啊,这个大家可以还记得我们之前有一个方法叫做对我可以直接把它做一个print scheme这样的一个打印吗?我们看一下它的这个类型是什么,它其实就应该是。
09:07
我们说就是时间戳类型对吧,CQ里边的time stamp类型,大家看一眼是不是当前这个类型是time step3啊三的话,大家知道这个意思是啥。这个是精度啊,精度相当于是三倍的精度,那其实指的是以秒为单位的话,是不是后面相当于三位小数啊,那相当于就是毫秒对不对,精确到毫秒的一个time stamp啊,然后后边你看到这里边是pro time,专门的有这样的一个特殊定义对不对,它是一个,这是非常特殊的一个一个字段啊pro time对吧,后面涉及到时间的时候用它。我们后面这是一个毫秒做单位的一个时间,对不对啊,这就是关于这个。处理时间特性的定义啊,那除了这种定义方式之外,另外呢,呃,大家就想到,那如果说我是直接,比方说直接从卡夫卡读取数据,或者说直接从那个MYSQL读取数据,读进来的时候,我不是先转换成流的,我直接就用之前的那个,我们讲那个connect方法对吧,直接就连接,然后就创建数据了,那怎么办呢。
10:11
那那就不能用我们这种就是流里边去做判断了,对吧?那接下来我们就是是不是直接在stemma里边追加呀,大家看到这里边就是直接field,这之前不是三个field吗?后边就是field,然后PT,然后指定当前的类型,必须得是TIME33对吧,然后接下来加一个proty,但是现在如果大家直接去这么定义这个sig码的话,可能还有一点问题,这个还是就是当前table API不是特别的完善啊,就这里边他执行的时候可能会报错,他说没有这个pro time这个方法就是没没办法去做这样的一个操作啊,所以啊,这个就是后面肯定这一部分会完善起来啊,大家知道有这种用法就可以了。这个是在定义那个,就是如果用connect,然后我们定义tablechema的时候,可以这么指定,大家还记得之前我们那个MYSQL定义的时候是。
11:05
那个我们连接外部系统,连接外接口的时候,直接写的DDL对吧,那DDL里边又怎么指定呢?啊,现在再给大家讲一个,就是DDL里面的生命方式啊,大家看就是create table,这还是这个对吧?Create table大家注意现在还是在字段里边要追加,只不过呢,前面三个字段啊,TS对吧,这个temperature double,然后下边有一个PT,又是又是平白无故啊呃,无中生有就来了一个字段叫PT,直接PT as pro time直接这么写就可以了。然后注意这个pro time这里它其实是一个,这里是个函数对吧。这其实掉了一个函数啊,这是我们在声明当前声明这个表的这个列的时候,当当前这个字段的时候,直接就掉了一个函数,做了这样一个处理,As这个pro这种用法,这是这个叫做计算列这样一个用法。
12:02
这只有blink里边才支持,所以大家如果想要测这种方法的话,那你必须得用blink的planner对吧,前面做设置的时候必须把它设成这个blink啊,这个大家就是下来之后感兴趣的同学可以试啊啊那这关于这个定义处理时间就是这样一个过程。
我来说两句