00:00
了解了流中表的概念,以及table API和Li CQ的基本用法,那接下来我们就要来说一说在处理当中使用表,使用table API和flink CQ有哪些比较特殊的东西啊?那最重要的比较殊的东西当然就是基于时间的相关操作了,比如之前我们比较熟悉的时间窗口相关的一些聚合和计算。我们知道在flink里边它是有不同的时间语义的,所以在data API里边我们可以单独的去使用事件时间语义和处理时间语义,同样在table API和flink CQ里边也可以去指定当前的时间语义。所以接下来这一部分我们就来讨论一下在liq table API当中怎么样去定当前的时间性以及怎么样去使用窗口。因为我们知道时间属性的应用一方面是在底层的process function当中去定义注册定时器,另外一个那就是时间口了。
01:10
啊,我们知道在table a CQ这一个层级,应用层级肯定不会涉及到底层的定时器,那最重要的应用当然就是窗口,所以我们把这两部分放在一节里边去进行讲解。那所谓的时间属性呢?其实就是在当前的flink CQ里边,我们会给表里专门提供一个特殊的字段,这个字段就是一个逻辑上的时间字段,这样的话,相当于我们在这张表里边每条数据里边就多加上了一个时间的属性。那本质上来讲的话,相当于就是我们当前的数据,每一条数据,呃,那对应的每一个字段就是我们表里边的一列了。
02:01
每一个column。考米考二。本来如果有两个字段的话,现在后边要再去追加一个时间属性字段。现在的整个表结构就变成三个字段啊,所以我们就会发现,当前既然它可以看成是表结构的一部分,那我们当然就可以在创建表的时候,使用DDL的时候直接就把它定义出来啊,那另外还有一种使用定义时间属性的方式就是。我们把datare转换成一张表的时候,也可以单独去指定时间属性是什么?那一旦我们在table里边表里边指定了时间属性,那么它就可以作为一个普通的字段。进行使用了啊,它的类型是time,就是在表里边所常见的我们关系表里边常见的这个时间戳的数据类型啊,在link CQ里边同样也是有这样一个类型,那他的行为呢,就类似于常规的时间戳,可以进行访问,可以进行计算,可以进行转换,这里要注意就是它一旦进行转换之后。
03:15
它就变成一个普通的字段了,不再具有时间属属性的含义了啊,所以我们在使用的时候还是要相对要注意一点,小心一点。那当然了,按照时间语义不同,我们把时间属性的定义就分成了两大类,一大类事件时间,另外一大类是处理时间。我们最关心的当然是事件时间了啊,使用flink进行流处理的时候,我们当然是希望使用事件时间,然后正确的处理迟到的、乱序的数据。所以接下来我们首先考察的就是事件,事件事件时间,其实我们都知道,它主要最大的用途就是处理这个乱序数据或者延迟,延迟事件嘛。所以。关键就是要设置一个所谓的水位线啊,那这个水位线它是基于什么来设,是基于当前。
04:09
数据本身自带的某个字段,我们要提取一个当前的时间戳啊,所以在。逊使。嗯,Time盘water这样一个stream的方法,然后去入一个水位线的策略啊,这样的话就可以指定当前怎么样去提取时间,怎么样去生成,那同样在当前linkq里边,我们也是要。提取出时间戳,然后基于时间戳去生成奥马那。生成指定对应事件时间属性的方法呢,就是有前面我们所说的两种,第一种就是在创建表的DDL里直接去做一个定义,我们可以看一下。
05:00
这里我们就直接在文档当中可以看到很清晰的看到这样的一个声明的方式,方法就是首先我们看当前的event table,我们create table创建一个这样的表,连接器表,然后后边首先定义当前的字段,User和URL,这两个是本身我们数据里边就带着的字段,然后另外我们记得还应该有一个time啊,这里我们是直接定义了一个TS,它本身的数据类型就是time stamp3这样的一个类型,这里的三指代的是精度。时间戳的精度。当前我们对应的。时间应该是精确到毫秒的啊,就是后边三位小数的话,那当然就是毫秒数了啊,那所以当前TS是一个time stamp类型的时间字段啊,那么当前我们就可以直接把它当成整个表里边的时间属性。
06:01
然后接下来我们要基于TS去生成要指定的生成策略,我们可以看到下边直接有一个关键字,就叫做water mark,然后four ts。含义就是说我们基于TS,然后去创建一个watermark,生成一个water,然后后面呢,又有一个关键字SS什么呢?这就是表示当前water mark生成的具体的时间值是什么样?那当然了,这里边其实就是一个基于时间戳去减去一个延迟时间了,水位线的延迟,这里边很明显TS减去T5SECOND,很明显设置了一个五秒的水位线延迟。而这里边我们的五秒是用这样一个表达式来单独定义的,就是INTER5SECOND,这是固定的这样的一个形式,就是inter是关键字Q里面的关键字,后边呢,是用引号引起来的一个数值,在后边是单位,当然我们这里是second啊,那如果要想要用其他的啊,Minute啊,甚至day这些都是可以的啊,当然我们的水位线延迟肯定就都是比较小的了。
07:18
然后另外就是在flink当中支持的事件时间的属性,那它的类型必须是timemp,或者后边加一个杠LTZ。也就是所谓的local time zone啊,就是本地带着本地时区信息的这样一个时间戳啊,本质上它都是一个time STEM时间戳,必须是这样的两种类型啊,那一般情况下,数据里边的时间戳,如果本身就是年月日十分秒这样的格式的话,我们可以直接把它定义成就把它提取出来,直接指定成类型,然后定义这个TS。这样一个时间字段属性就可以了,那如果说。
08:04
当前的时间,原始的时间戳,就像我们在给出的这个数据里边啊测试的数据一样,直接给的就是一个长整形的毫秒数,那这个时候又应该怎么定义呢?啊,那这个时候的话,我们应该要用另外一种方式。我们可以看到这里的话,首先当前的TS是一个big in长整型,然后接下来呢,要先把它做一个转换啊,就是我们这里边可以比方说把它转换成一个time stamp类型,或者转换成一个time stamp l类型,就是我们说的带着本地区的这样类型啊,如果我们本身拿到的这个时间戳就是本地的。我们本地啊,时区对应的一个时间戳的话,当然是转换成LTZ更方便啊,如果是标准时间的话,就是我们所说的UTC时间啊,呃,伦敦的那个格林威标准时间的话,那自然还是不加LTZ,就是转换成time就可以,这个转换的过程我们看这里边其实也是。
09:05
As,什么呢?后边这是一个link CQ给我们提供的对时间进行处理的函数,叫做two time l TZ,就是把当前的一个长整型值转换成。转换成一个。LTC类型的值,那转换呢?转换TS后三当然就是精度啊,所以这样的话就转换成了一个毫秒精度,毫秒级别的。TC时间啊,那当然接下来同样就是现在的时间字段就不再是TS了,时间属性是TS,我们叫做TS_LTC啊,那么我们就是for,然后S当前的时间戳要减去啊,对应的这个water map的延迟时间。这样的话我们就可以。
10:00
通过一个原始数据长整型的转换成一个时间戳类型,然后明了当前的时间属性字段,然后还在此基础上定义了曼。接下来我们可以在代码当中对这一部分内容来做一个具体的实现,我们看一看到底是怎么样去定义的啊,那接下来我们可以在。当前的包下边去新建一个Java plus,当然我们现在主要是测试一下这个时间属性,那另外我们后面时间属性怎么用呢?当然是跟窗口有关了,所以我们这一节就都放在一起去测试time and window。Test。把这个Java类先创建出来。然后接下来我们还是按照之前的习惯,先把整个框架起来。然后接下我们首先需要去。创建出当前的表环境啊,那表环境有两种创建方式啊,我们提到那最简单的一种方式,当然就是还是先。
11:04
获取到当前的流处理环境,Execution environment,这个之前熟悉的方式可能会更加简单一点,而且主要是考虑到我们后边的一种定义时间属性的方式,是从流里面去定义的嘛,流向表转换的时候去定义的,所以这样的话后面我们也会更方便一点啊。当然了,为了。测试的结果更加的明显,我们还是可以把全局的并行度先设成一啊,这样的话更简单一点。接下来就是table environment,要去调create方法,直接把进来就行了啊,要不然的话我们还得去定义那个environment settings啊,那所以接下来我们可以得到这样的一个tableable env。创建出来环境啊,那首先我们应该要读取数据源,这里边我们读取数据源的时候,还是去创建一个连接器表就可以了,哎,那所以这里我们可以直接把。
12:01
之前的这个过程全部copy过来。我们现在是第一种方式。在。创建。表的。DDL中。直接定义时间属性。那呃,当前我们已经有了基本的这个表的定义了,Table,然后对应的三个字段,然后接下来呢,我们当然就是需要去新增加。新增加,呃,至少应该有两行吧,一个是当前我们指定的时间属性,另外还有一行其实不是我们的,呃,新增的一列我们知道是单独的,要声明一下当前的mark是什么,都是关键字啊,那所以接下来我们就可以定义一下当前新的时间时间属性,比方说我们当前是事件时间嘛,所以我们干脆就直接把这个定义成even的time吧,或者简写就叫ET。
13:01
Time的简写啊,那接下来当然是as as现在要把当前的时间戳转换成一个time STEM类型,长整型转换成时间戳类型,那现在我们的这个测试数据呢,其实这个。不用去划分十区对吧,如果是划分十区的话,我们当前北京时间是东八区,那东八区的话直接给一个这个一秒两秒的话,换换算成格林威视的标准时间反而变成负的了啊,所以其实我们这个没有必要,我们直接就把这个测试的数据当成标准时间就可以了啊,那所以当前呢,也不需要去to,我们这里接to。Time spent。就可以了,然后后边就相当于调用了一个函数,后边我们需要把它做一个对应的转换,那之前我们是直接把这个TS就拿过来放在这儿,但是现在呢,呃,我们现在。需要去。我们之前的想法应该是直接把TS进来就可以了,但是很尴尬,在flinkq底层给我们实现这个函数的时候呢,它并不是传进来的,是一个长整型值,它是要传进来一个string,也就是年月日十分秒这样一个格式的string传进来才可以啊,那这个怎么办呢?啊,没关系,我们再调用别的函数,再做一重转换就可以了,诶,那这里要调的是一个from。
14:27
X这样一个方法。哎,这样的话,它可以把一个长整型的值转换成一个string,就年月日年月日十分秒这样一个格式的的ix time我们知道就是标准的呃,UC时间啊,那所以当前我们传入TS,注意这里边要求传的单位还得指定90秒啊,那所以这里面我们还得除以1000,这样的话我们就可以得到对应的这样一个time stamp类型了。
15:00
那当然了,后面我们需要继续再一个加注意,这里边应该还有一个逗号。那上面这里这个TS这里也需要去补一个逗号才可以。然后后边当然还有一个就是。然后for啊,那for的话,当前的ET我们主要是以它作为时间性字,然后去对应的watermark的,那我们现在可以就去定义一个当前的延迟时间了,那比方说这个就稍微短一点吧,因为我们一开始第一个数据就是一秒来的嘛,那我们就直接延迟一秒好了,呃,注意这里边的。书写方式是interval,然后加一个。一后面是当前的单位啊,这样的话,我们就把对应的。字段全部定义完毕了啊。呃,那呃,后面我们已经有了括号,那这个后边就不需要再加括号了啊,这样的话,所有我们要想表达的东西就都已经写在这儿了,后边的话,那当然就是还是连接表该连接谁,那就指定对应的connector,然后后边相关的参数配置进来就可以了。
16:14
这就是我们在代码当中创建表的DDL里直接定义时间属性的方法。
我来说两句