00:00
我们已经了解了flink table API当中表的概念,我们发现它是动态变化的,所以把它叫做动态表,那动态表跟流之间的相互转化呢,其实核心在于中间我们进行的这个持续查询啊,我们说根据持续查询是否会带来结果动态表里边的更新操作,那可以把它划分成更新查询以及追加查询两大类啊,那假如说我们这里要做的这个转换计算并不涉及到最终动态表里边的更新操作,诶,那么它其实就是一个追加查询,后面如果把它转换成流的话啊,那其实直接to stream啊做一个。仅插入流或者颈追加流的编码就可以了,那如果说我们的CQ当中出现了一些更新操作动态表里边它的结果要改变,那在这种情况下,我们就把它叫做更新查询相对应的结果动态表,诶,出现了更新操作,那如果要做编码转换成流的时候呢,就只能是转换成一个change log,一个更新日志流啊,那对应的编码方式可以是撤回流retra的方式,也可以是更新插入流UPS的方式,这就是前面我们介绍的整体的流程。
01:17
当然了,在这个过程当中,我们会发现啊,怎么样去判断一个持续查询到底是否是更新查询呢?那一般我们来说,如果出现了分组聚合这样操作的话,那那一般就是新来数据之后,要去更新之前的聚合结果,它就是一个更新查询。但是凡是总有例外,前面我们介绍了一个比较特殊的场景,它尽管也是分组聚合,但是呢,它其实只是一个追加查询,这就是我们所说的窗口聚合,如果我们是基于时间开了一个窗口,然后每个窗口统计里边某一个聚合值,我们统计这个URL的抗数量的话,那么每个窗口都只会输出当前的一个结果,诶,输出之后这个就不再改变了,所以在结果动态表里边其实是只有追加,没有。
02:06
更新的啊,所以在这种场景下啊,后边我们同样不需要把它to change log啊,不需要去做这个更新日志的转换,而是直接在后边追加结果就可以了。那前面我们提到这一部分窗口聚合呢?呃,因为没有详细的展开,我们其实发现啊,这个窗口聚合在CQ当中的写法是有很大的不同的啊,因为本身我们知道在CQ当中对于窗口其实是没有相应的直接支持的,而窗口操作呢,在flink流处理当中又是一个非常重要的特色啊,我们知道flink可以说跟其他大数据处理工具最大的一个区别,或者最大的一个优势就在于它灵活丰富的窗口操作,那那所以在弗林CQ当中,我们现在是一个高层级的API,当然也要把这个优势发扬光大。那到底怎么样在CQ当中去使用窗口呢?接下来我们就重点讲一讲link CQ当中的窗口啊,那说到窗口最重要的主要就是时间窗口了,它是一个基于时间的处理操作,那既然涉及到了时间,那我们就首先需要去定义相关的时间语义啊,到底是事件时间语义还是处理时间语义,而且呢,我们还需要去确定当前时间到底是根据什么数据去提取出来去确定的,就特别是事件时间语义下,我们知道一般都是从数据当中某一个字段去提取对应的时间信息。
03:34
所以对于table a和CQ而言呢,啊,这里边窗口操作往往就是跟时间属性直接相关的,所以在KAPI和CQ里啊,它是会给表单独加增加一个逻辑上的时间字段,这个时间字段就叫做时间属性啊,这个时间属性我们可以认为就是当前我们有一张表,比如说已经有这么几个字段了,已经有这个user URL,另外可能还有一个我们所说的这个TSMSTEM这样一个时间戳,那有了这三个字段,我们把这个表的结构创建出来之后呢,如果接下来想要针对它去进行时间窗口操作的话,那就必须还要单独去指定一下当前的时间语义和时间属性到底是什么。
04:22
哎,那我们可能会觉得奇怪,这不是已经有time STEM了吗?对于flink来说,我们当前数据里面的每一个字段并没有特别的含义,所以你必须得显示的告诉他到底哪个字段是时间属性,而且这个时间属性有可能还不符合我们的格式啊,必须得转换成在CQ当中用到的time stamp这样的数据类型才可以。所以接下来我们在这个。Link CQ的操作当中啊,首先就应该先去指定一个当前的时间属性。其实这个操作我们也并不陌生啊,在data streamam API里边,一般我们做的操作是什么呢?哎,那就是来了这个数据读取数据源之后啊,后边基于data streamam可以调用一个assign time stepmps and water marks这个方法啊,那或者更加一般化的,如果生去流的话,我们调用的是assign asending time stepmps这个方法,那它的作用是什么呢?呃,其实主要就是我们前面说的,现在数据里边三个字段都已经有了,那到底哪个字段能够代表我们当前事件时间的进展呢?能代表当前数据它所发生的时间戳呢?那我们要指定时间戳提取的方法。
05:33
把时间轴提取出来之后,另外我们说考虑到这个时间时间里边乱序流的处理,诶,那还需要去生成watermark,所以一般情况我们针对乱序流,那是给一个watermark的延迟时间,基于当前最大的时间戳去减去一个时间间隔,得到的就是对应生成watermark的时间。啊,这是我们之前在这个data STEM里边的操作,那现在table API啊,其实想要做的这个事情啊,也是非常的类似,就是必须在数据已有的字段里边提取某一个字段出来,指定它就是当前的时间,然后接下来就可以基于这个时间去做窗口操作了。那对于table a和CQ而言呢,整体来讲,呃,我们可以把这个时间属性啊,按照时间与意义的不同划分成两种情况,一种就是事件时间,另外一种就是处理时间了。呃,首先我们肯定关心的就是事件时间,因为在实际应用场景下啊,我们数据可能有这个乱序,可能有比较复杂的情况,一般我们都是按照事件时间来进行处理的。
06:37
那前面我们也已经提到了事件时间与意下啊,要去指定当前的这个时间属性啊,要去做窗口操作最大的一个步骤,那其实就是分两步走,首先我们应该从数据所有的字段里边提取出对应的时间戳,哎,把它指定成某一个时间属性,然后呢,如果说我们涉及到这个有延迟事件,有乱序数据的话,那还应该再去基于当前的时间戳设置一个watermark的延迟啊,所以主要是要做这两件事情。
07:10
然后呢,我们就看一看在具体应用的时候,到底在这个table API和CQ里边啊,怎么去定义,怎么去用啊,那其实我们会发现这个table API和CQ啊。它底层都是基于表的操作,本质上都一样,但是呢,具体形式那就table API看起来更像是我们skyla或者Java代码当中的一个方法调用,而CQ呢,诶,那就是直接在我们熟悉的CQ一个字符串里面啊,直接去写对应的CQ语法了,所以这两种方式呢,也对应着我们定义时间属性的不同方式啊。这里我们就分别来介绍一下啊,那首先呢,就是直接在CQ当中就定义了啊,也就是说我们在创建一张表的时候,之前不是说过可以创建一个连接器表吗?哎,我们直接create table,某一张表even table,然后指定对应的字段到底是什么名称是什么,类型是什么,后边加一个with子句,指定连接器,那在创建这张表的DDL里边。
08:09
我们就可以单独的去确定一个时间属性,因为前面我们说了啊,所谓的时间属性可以认为就是在原有的数据基础上多加了一个属性字段嘛,哎,那所以在这儿我们看DDL里边就直接可以去,呃,之前我们这个不是有一个TS吗?注意TS如果说它的类型直接就是time stamp类型的话。这是CQ里边的类型。那么它就符合我们CPU当中要求的时间属性字段的数据类型,那这样的话我就可以直接把它指定时间字段,那怎么样确定它是时间属性字段呢?呃,那后边我们只要再确定一个water mark基于谁去生成,那么对应的这个字段就是时间属性。所以我们看后面这里有一句water mark for ts,好,那既然是for ts,所以自然这个TS就是当前事件时间与以下的时间属性字段了。
09:07
它必须是他类型后面这个三是表示精度的啊。然后我们看这个water mark,它后面生成定义的这个语法又是什么样的,For ts,哎,这是指定的时间属性字段,基于它去生成水位线,后边呢,SS指定的就是water mark的生成策略,那么它到底怎么生成呢?就是基于当前TS,这其实说的是当前最大的时间戳去减去in interval5second,哎,所以这后边给的这个其实是CQ当中flink CQ当中啊,对于时间间隔一段时间的一个特殊的表达。这是固定的模式,就是前面一个interval关键字,然后后边呢,呃,用引号引起来的一个数字,一个数值,然后后边再加上时间的单位,所以INTERVAL5SECOND就表示五秒钟啊,那么TS减去五秒钟,这其实就代表了我们watermark水位线的生成策略是基于当前最大的时间戳,再延迟五秒。
10:09
啊啊,这里面的这个as啊,其实是fli CQ给我们提供的一种非常特殊的用法啊,它的特点就是说我可以基于现在已有的一些实际有的这个字段啊,再去扩展,再去生成计算出另外一个新的字段,哎,所以呢,这种方式有时候会把它叫做计算列,我们可以生成一个新的列,在表里边追加这样的一个列。这就是我们在这个DDL里边去指定时间属性字段啊,而且确定这个water生成方式的方法看起来有点奇怪啊,就是这里边我们是把这个TS已有的一个时间字段啊,直接指定成了时间属性啊,那我们会发现啊,这里边要求它后面的类型呢,必须是time STEM类型。这里要强调一点,就是flink CQ支持的时间属性字段必须是time step或者是time stamp杠下划线啊,LTZ这样的一个类型,这里的LTZ指的就是local time zone,也就是带有本地时区信息的这样的一个时间,初二啊,就是年月日十分秒这样的一个形式啊,一个time step这两种形式都是可以的,除了这两种之外,那就不能指定成时间属性。
11:22
诶,所以我们就发现这个跟我们实际情况不太一样啊,啊,就像我们现在的这种数据里边,后边这是一个长整型的时间戳,它并不是CQ的time STEM类型,那这个时候怎么办呢?哎,那就需要调用一些函数,把它做一个转换,转换成time Sam类型啊,比如说我们看啊这里的定义方式。这里的TS是一个长整形,那我们知道在cqu里边它的类型是begin,然后接下来呢,诶,那就需要单独再去生成一个字段,这个字段是基于TS去做的一个计算转换,所以我们看他就直接S,然后它的名称呢,叫做TS_LTZ啊就是相当于是基于本地时区经过转换之后的一个CQ当中的time类型。
12:08
那这里调用的方式是to time stamp l TZ,把长整型的TS传入,然后后面跟上一个三,这是精度,转换过来之后,它的数据类型就变成了time stamp l TZ类型,那么对应的这个TSLTZ就可以作为时间属性字段了。所以下面我们看到是water mark for t SL TZ,然后S以它作为时间属性字段去生成水位线。那我们会想到这里为什么非要把它转换成time stamp l TZ呢?我们能不能直接把它to time stamp,没有后边这个行不行呢?呃,也行,但是to time stamp的话,注意它后边传的参数不能直接传一个长整形啊,就不能传一个数值,它后面得传什么呢?就只能传一个符合标准格式的字符串,就是年月日十分秒那样的格式啊。诶,所以two time stamp用起来就会反而稍微麻烦一点,Two time stamp l TZ反而简单一点。
13:07
这就是在这个DDL里边进行定义的这个过程,我们可以在代码里边具体去做一个尝试,做一个实现啊呃,我们可以新建一个测试的object。你有一个SC的object。我们把它叫做主要是测试时间和窗口time and the window test。没方法先写出来啊,那前面的流程当然还是完全一样了,我们可以借鉴之前这个common API test里边啊,我们先创建一个。表执行环境,我们就直接基于这个流执行环境啊,流处理环境,创建一个table env,然后接下来呢,读取数据源,这里边我们读取数据源的时候直接就用。DDL的形式创建一个连接器表啊,我们从这个file从文件里边去读取,所以直接把这个可以copy过来。这里我们create table even table啊指定了当前的三个字段,一个是UIDURL,还有TS,这三个字段我们都能直接把它解析出来,那后边假如说我们想要使用这个时间窗口的一些操作的话,必须在这指定时间属性啊,那接下来怎么办呢?这里我们做的操作那就是再去定义,呃,注意这里的TS啊本身是b begin的类型,所以它不能作为时间属性字段,我们还得单独定义一个时间属性字段,呃,这个我们就叫做,因为后边是叫作为事件时间嘛,因为看简称我们叫做ET吧啊,那ET它应该就是基于TS,把它这个长整型的数据啊,转换成一个我们想要的时间属性字段的类型,Time s或者time s l TC。
14:41
一个比较简单的方式啊,在我们这个文档里边使用的是S,然后我们说这就是一个计算列嘛,啊相当于基于已有的数据要去生成一个新的列,那么这里直接as后面调用的是two time STEM l TZ方法,那么它呢,可以传入一个长整型的数值进行转换,转换成time l TZ啊,那我们现在如果说不想这样做啊,我们想要直接比方说就调这个。
15:06
To time stamp方法。就直接想要做这样一个转换,那行不行呢?啊,直接把TS传进去肯定是不行的,因为我们说啊,Two time STEM这样一个函数里边要的是一个年月日十分秒那种格式的时间字符串啊,它里边传的是一个string,所以呢,那我们需要先把这个TS啊,Big in类型的TS转换成一个标准格式的string,然后接下来再去传给这个two time step转换成一个time step啊,这个方法就稍微的绕一些啊,但是也可以啊,弗Li给我们提供了对应的这些时间转换的函数,我们这里用到的这个函数叫做from,然后下划线unix time。也就是转换成标准的UTC时间的时间戳啊,那我们知道这个标准UTC时间是基于1970年1月1号零点开始啊,到目前为止的一个时间戳,一个秒数或者毫秒数,标准的这个时间戳应该是一个毫秒数啊,而这里边from mux time这个函数呢,里边传数的它要的是一个秒数,所以假如说啊,我们前面的这个TS本身是一个毫秒数的话,那怎么办呢?怎么得到秒数呢?诶,那还得做一个除以1000的操作啊,这样的话得到的就是一个真正意义上的啊这个unix time啊,所谓从1970年1月1号伦敦时间标准UTC时间开始计数的一个描述,然后转换成的一个time step,那然后接下来呢,得到这个time step之后,ET就可以作为当前的时间属性字段,所以接下来呢,啊,我们还应该有一个watermark的定义。
16:46
Water mark。那么后面它的写法呢?首先要指定当前的时间字段for ET,然后也是一个。计算列的形式as后边,哎,它的表述到底怎么去写?哎,那后面是基于ET,然后要减去一个时间间隔interval。
17:04
比方说哎,我们这里给一个三秒钟的延迟,或者两秒钟的延迟都是可以的,二然后second。这就是我们完整定义的一个过程。相当于是在这张表定义这个DDL啊,创建表的DDL里边去追加了两个列啊,那这两个列呢,一个就是当前的时间属性字段ET,它是基于TS做的一个转换,另外一个呢,诶,其实另外一个不能叫专门的列了啊,是water rock,我们根本没有办法直接用它,那这里面增加的这个ET呢,是可以单独用它的,是真的可以把它拿出来去进行处理,进行转换,去进行使用的,它的类型就是P3。啊,当然这里呢,我们这两个列之间还是需要有一个逗号去进行分割的啊,这里边有一些小细节,在书写CQ的时候还是要多多注意啊,那这里我们可以做一个测试啊,来看一看当前我们拿到的这一个表里边它的结构,它的所有的列的属性字段啊,到底是什么啊,我们记得之前table不是有一个方法叫print game吗?哎,那就可以直接把它打印出来,我们现在呢,是在表环境里边注册了一个even的table,那需要把它拿到对应的表的实例对象,哎,那还应该基于table env去调用一个from方法。
18:24
把这个表名传进来,得到一个table的对象,接下来调用print s,我们来看一看得到的到底是什么。运行一下啊,当然了,现在我们其实没有这个基于表的处理流程,所以后边其实不做任何的数据处理,我们只是打印一下,看一看当前这张表里的东西到底有哪些,我们看到现在它的字段就是有这样的五个,除了UI durlts之外,还有我们追加的一个ett,我们看它的类型就是time step3好,然后后边还带个星号RO泰,这是它的类型啊,指定当前就是事件时间与一下的时间属性,然后最后呢,还有一个比较特殊的,那就是water mark for ET,好啊,这个其实不能算是一个单独的字段了啊,它一般情况我们没办法直接用的,他就是默默的帮我们指示着事件时间的进展。
19:16
这就是在DDL里边创建表的时候,直接去指定事件时间属性的过程。
我来说两句