00:00
那我们再来给大家讲解弗link里边另外一个非常重要的概念,哎,我们看一下他在table API和弗link CQ里边到底怎么体现,那就是时间特性,呃,那提到时间特性,其实大家之前想到的应该是时间语义,对吧?啊,就是如果说我们要定义跟时间相关的操作的话,比方说窗口,诶大家想到之前在那个data stream里边有窗口操作,那你要在CQ里边是不是也应该有窗口操作啊,哎,这个其实是很平常的一个需求啊,那如果说你用到了时间相关的这个窗口的话,时间窗口的话,那是不是我们就必须要定义到底是什么时间语义?另外假如说我们用到了事件时间语义的话,是不是那个时间字段也得单独去提取去定义啊,因为我们说事件时间,我们表达那个事件时间进展的那个,呃,我们本来要用那个watermark对吧,它本身是基于数据的时间戳里边提取出来的。
01:00
所以如果说你不去单独指定的话,我们当然是不知道当前的事件时间是什么啊,所以这里边大家注意一下,就是我们同样在table API和flink CQ里边同样可以做这样的一些操作,可以设置不同的时间语义,然后提取数据里边的时间戳字段,然后呢,加入到整个的这个table的处理过程当中啊,这里边有这么一句话,大家理解一下,就是table呢,它可以直接给我们提供一个逻辑上的时间字段啊,也就是说本身我里边这张table里边定义好的字段可能就三个,像我们那个传感器的话,ID temperature,还有一个time stamp,对吧?然后接下来呢,它是在这个基础上,大家注意啊,有同学可能说,哎,那time stamp这本身不就是一个时间字段吗?呃,当然你也可以直接把它定义成这就是当前的时间字段,对吧?呃,另外呢,就是table API给我们有了这样一个选择,就是。
02:00
我可以再去追加一个逻辑上的一个字段,就是这个字段看起来好像是凭空出现的,对吧?这本来我数据里边根本没有,但是我现在追加一个字段,这是个什么字段呢?就就就是当前的时间字段,接下来如果我用到跟时间相关操作的时候,就以这个字段为准,对吧?啊,它用来指示时间和访问相应的时间戳,那这里面我们的这个时间属性,或者说时间特性啊,Time attributes可以就是每个表STEM里边的一部分,如果一旦我们定义了这个时间属性的话,接下来大家就可以把它当成表里边一个正常的字段来做引用。啊,而而且就是说可以给他基于时间操作啊,做各种转换,做做各种计算这些都是可以的,他的行为呢,就是类似于一个常规的时间戳,就是CQ里边的time sta那个类型,那所以这个接下来我们做的操作,这个就是在表里边又引入了时间的概念,那接下来我们就具体来说了,首先最简单的就是处理时间processing time对吧?那大家知道在处理时间这个process STEM语义下,其实我们说处理这个执行一个任务的时候,我们判断当前时间的候用的就是机器的本地时间,那所以这就是最简单的情况了,它不需要提取时间戳,不需要生成owa对吧?啊,那我们给大家介绍几种不同的定义处理时间字段的方法啊,第一种,这也是最简单的一种方法啊,大家看到就是由data stream转换成一张表的时候,直。
03:41
直接指定,大家还记得我们这个呃,Data STEM转换表的那个过程吗?非常简单,就直接就是一个调这个env的一个from data streamam方法,对吧?啊,在这个里边我们可以把那个data stream传进来,最后得到一张表,然后后边我们还可以去,大家记得就是可以按照名称,按照位置去指定字段,对吧?啊,或者说也可以不给,就是相当于默认把这个里边我们那个样例类或者说元组类型,一个一个都匹配到当前的这个字段里面去了。
04:11
那大家看到现在我可以怎么样呢?后边可以直接追加一个,你看本来不是ID temperature time Sam吗?现在如果是处理时间的话,这个time Sam其实也没什么用,对吧?我给他要追加一个时间字段,然后这个字段比方说随便给一个名称就叫做PT,就是所谓的这个processing对吧?直接我把它定义出来,然后注意它后边,因为你这里边如果什么都没有的话,直接就来一个P,那肯定这个就我们的程序就搞不明白了,你你这个data stream里面明明就只有三个字段,你这怎么变成了四个呢?所以这里边要加一个点pro time,就是processing time对吧,表示这个字段就是当前的处理时间,也就是相当于把我们当前这个STEM扩展成四个属性了,而且注意啊,如果说我们定义这个pro time的话,它只能是一个附加的字段,我们不能是把里边某一个time Sam定义成。
05:12
Time对吧,我这边直接time sam.pro time这这个是不准不允许的,我们只能附加一个啊,这就相当于是增加了一个虚拟的一个字段一样,对吧,一个伪列一样啊哈,而且就是说因为它是扩展出来的一个STEM,所以只能在最最末尾最后一个字段去定义,呃,就是之前我们不是说基于位置去定义的时候,我可以省略一些字段嘛,对吧,我如果只用这个ID和temperature的话,诶,那这个时候我要后面再定义这个processing step可不可以呢?诶可以的,你可以第三个位置直接把这个time step省略掉pt.pro time,它就相当于跟time step没关系,我现在是要追加一个处理时间字段啊,这就是这样去定义啊,那我们在代码里边可以给大家先把这个先简单的写一下啊呃,还是在当前的table test下边去新建一个object,当前这个我们是时间。
06:12
呃,特性的一个测试,当然了,时间特性本身没有太大的意义,肯定是要跟后边的窗口合起来测,对吧?所以我们time and的window test那方法里边,诶这个我就完整的再给大家来过一遍啊,首先我要去创建一个执行环境对吧,流处理的执行环境,我们把这个当前这个下划线引入,然后接下来啊,我还是直接把这个先设置好啊,就是并行度不影响我们的这个结果正确,我简单一点啊,后面不要,呃那个打印输出的时候乱了啊,直接就给它设成一,然后后边呢,哎,我我这个就是还可以设置这个,大家还记得这个时间特性吧,Stream,呃,Time characteristic对吧,这里边把这个时间特性可以引入进来做一个配置,但是这里边如果不不设置的话,默认就是processing time对吧?啊,你在这里可以单独给它做一个定义,然后接下来呢,我们要基于。
07:12
当前的这个流处理环境,然后去创建一个表的执行环境对吧?啊,那这个表的执行环境我直接tableable啊,Environment对吧?调这个create方法里边把这个env传进去,这样的话就可以了啊,当然大家如果要是想要用啊,就是方便我们以后做这个呃,调整的话,呃,这里边我们可以把那个settings定义出来,大家记得这个settings的写法吗?呃,Environment environment setting点啊,New instance,就是它本身没有这个构造方法,被私有化了对吧?哎,我们要直接去new得到它的这个build,然后去调它的build方法,那一般情况我们在里边要配置几个,比方说我们用这个use old planner对吧,或者我这里边直接用这个blink plan planner也是可以的啊,大家要注意一下,就是一点十版本的时候,如果我们。
08:12
什么都不指定的话,默认里边使用的是啊,这个old panda还是老版本的,而01:11的话,呃,就是我们最新的这个版本blink已经比较稳定了啊,觉得默认情况下你什么都不配置,直接这么创建的话,它用的直接就是blink planner啊,所以这个大家可以稍微的区分一下,然后这里边我们一般情况是in streaming,呃,Mode对吧,然后去点build去做操作啊,那平常情况下我们可能以后就直接这样一句话把它搞定就完了,那现在如果说大家想要去做一个这个具体的指定的话,可以把这个settings传进来啊,这样就定义好了当前的这个表环境啊,然后接下来呢,哎,我们可以先把当前的这个数据先读成一条流,对吧?哎,这我还是。
09:03
直接借用我们之前那个example这里边的这个这个处理流程吧,啊大家还记得这里边我就直接读取数据,从这个呃文件里边把它读出来,后边呢,先转换成样例类,对吧?这里面我直接过来这个就不详细去做了啊map成我们当前的一个reading这样一个样例类啊,那得到这个样例类。呃,得到这个样位之后,后边我就可以直接把它做一个调整转换,对吧?我想得到这样一个sensor table,那这个ssor table直接就用cable env,然后大家看到from data stream,把当前这个data stream写进来,对吧?后边这个字段我就可以指定了ID啊,第二个字段这里边本来应该是那个time step,但是我其实可以大家知道我可以用这个名称对应的话,它的这个顺序就可以改变,对吧?我可以把这个temperature直接提到前面来,然后诶,这里边大家看看报错,为什么呢?你必须得有影视转换才能用当前这个scalela的simple来表示我们这里边的表达式,对吧?呃,这里边得引入影视转换,那是把这个留的stream table的environment下划线啊,这个改成这个下划线,把这个引入,然后这里边就可以把它当成表达式了,然后后边啊还可以再去time stamp,诶把这个引入诶后。
10:27
那如果是当前是处理时间的话,引入一个字段PT啊,这个前面这个字段名称无所谓对吧?诶你叫什么都可以,你叫TS也可以啊,这个无所谓,我我们这里边就是简写的话,要区分它是事件时间还是处理时间嘛,啊教程PT可能稍微的代码可读性好一点,然后要第2PRO time把它列在这,这样就指定里边就多了一个这个时间字段,然后接下来大家可以稍微的看一下啊,哎,我们知道最后如果你想看它里边的东西的话,我们有一个print stemma这样的一个方法,对吧?可以看到里边的这个整体的结构啊,数据架构是什么表的这个结构是什么样的,另外我还可以直接把当前的这个转换成to一个append stream,对吧?啊,然后我这里面直接就用这个格式给大家输输出。
11:19
Link types肉啊,然后print直接把它打印输出,最后不要忘记把这个当前流式程序执行起来,对吧?啊time and的window test,好,我们现在来执行一下,大家看看这个效果怎么样。好,大家看到运行起来之后,这里已经输出了当前我们表的结构对吧?哎,当前表的结构是什么呢?多了一个PT这个字段,ID是string对吧?这里是,那这里边多了一个TIME3对吧?诶,那那这这里边就是当前我们追加的这个,而且后面你看它是它的这个类型还是一个pro time对吧?呃,特别指定的一个time,就当前的这一个processing time时间字段,它本身你可以当成一个time s这个数据类型直接来做转换啊,这也是可以的,然后大家看到下边我们就把这个全输出了,对吧?当前得到这个数据就全部输出了,那这里边我们的这个具体最后的这个字段是什么呢?这就是一个时间戳嘛,你看对吧,Timemp如果要直接打印输出的话啊,这就是一个标准化的,一个年月日十分秒的,当然这里边是毫秒,对吧,大家看为什么time time stamp3呢,这就是表示我们当前的。
12:36
这个时间戳精度当前是啊,就是三位毫秒数这样的一个,就是以毫秒作为单位的一个呃精精度单位的这样的一个时间戳的表达,对吧?啊,但大家知道还有这个time step6,那就相当于是六位小数,那就应该是微秒啊,对吧?啊还有time sam9呢,就相当于是纳秒啊,这个有更更深更精度的,精度更高的这种表达,我们这里边一般常用的就是这个time sam3啊,毫秒表示的一个时间戳,这就是关于定义这个时间字段的这样的一个过程啊,这是给大家讲了一种最简单的方式,就是说我们先得到一个流,对吧,再把这个流转换成表的时候,这里边直接去做这里做这样的一个转换,因为大家知道在做这个呃流的这个定义的过程当中,我们这里边字段都已经定义清楚了嘛,所以说这就不涉及到任何的。
13:36
我还要想去做这个,呃,STEM的定义啊,对吧,或者说做其他的一些定义啊,这什么都不涉及,你只要把这个字段定义清楚,然后后面把这个想要用到的这个时间字段列出来就完事了,所以这个其实是特别简单的一种方式,那大家可能想到,如果说我要是不用这个理由的转换方式呢?哎,我用的是之前对吧,这个connect连接了外部的文件,文件系统把它读进来了,那在这个里边我要怎么样去定义一个字段,当当前是这个呢?哎,这里面给大家把这个列举一下啊。
14:11
接下来大家看到还有第二种方式是什么呢?可以在定义table schema的时候,直接指定当前的pro time啊,大家看这种方式就是我这里边不是ID time Sam,还有temperature对吧?有这样的三个字段吗?然后数据类型都已经定义好了,后边我直接追加一个字段,叫做PT还是一样的对吧?啊,就直接在后边追加,注意它的类型必须是time sam3必须是这个类型对吧?呃,毫秒为单位的这个time,然后后面呢,加一个,哎,这个追加的表达点pro表示当前的这个后边,这里边它是一个就是所谓的这个processing time的一个时间字段,时间。时间的这个表示时间的这个特别的这个特时间属性对吧?哎,这是这个比较,呃,也是一个比较常用的一个方法,但是这里边呢,有同学可能下来之后大家可以做一些尝试啊,就是你如果说比方说我们这个,这不是这个文件读取数据源吗?我如果在这直接去定义这样一个pro time,你如果直接去运行的话,它到时候可能会报错,为什么呢?呃,这里边大家要注意一下,就是我们还是回到之前给大家说的这一个,呃,CS大家知道当前我们这个呃文件的这一块都是csv cable sourcece cable thinkk,对吧,它的底层都是这样的一些东西,所以说我们可以看一下当前的这个csv table sce,它实现的这些呃接口和实现的这些,呃抽象类有哪些呢?看实现的有一些这个,呃,就是stream table sce对吧,首先是我们这个table sce的一个流式读取数据源的啊,这样的一个接口。
15:59
另外还有best table sce啊,还有当前这个,呃,就是我们做这个批处理的,呃,Table sce啊,这也有,然后还有lookable,呃,Table sce对吧,就是你可以去做这个查询,做做这个便利的这样的一个table sce啊,那还有这个,呃,Projectable table sce对吧,各种各样的table sce,但是大家注意这里边呢,它没有能专门允许我们去定义时间属性的那些接口,它没有实现,所以说这里边啊,你看代码里边直接去这么写的时候,它其实这里边不会报错,但是如果执行的话,就有可能会报错了,因为它本身没有没有去定义当前的这个某一个字段,能啊,当然这里边我不应该直接在time,呃,这个time上面直接去做这个操作,对吧?大家知道这里边我要做的操作应该是重新去定义一个PT,这个随便叫什么都行,对吧,然后给一个time stamp3给一个这个数据类型。
16:59
然后后边把它定义成这个pro对吧?啊,但是这个运行起来会报错,那什么地方就不会报错呢?诶这个就给大家看一眼卡夫卡,呃,这个table sauce,之前我们看过的是它的那个,呃,Sink对吧?Table SK,这里面我们再看一下S,它这里面实现了哪些接口呢?有table sce对吧?流失的这个,然后你看到它没有实现那个批处理的那个啊,因为我们说这个作为这个SS员的话,卡夫卡本来就应该是消息队列嘛,本来就是一个一个数据来了之后消费,呃,这个队列依次这个先进先出去去读取就完了,它当然不应该把它当成一个批处理的源,对吧?哎,所以这里边它就没有实现那个批处理的那个接口,另外大家注意它还实现了什么呢?DeFined pro time attribute就是可以去定义当前这个处理时间的时间特性,对吧。
17:59
另外还定义了一个实现了一个接口叫DeFine row attribute,那与之对应的这个row time大家想到了应该是就是定义事件时间时间特性的那个接口,对吧?哎,所以这里边就是如果实现了这两个接口的话,里边你用它的这一个table sce啊,在这个描述器里边直接去定义是合理的,如果它没有定义的话,你这么写它还会报错啊,所以说就是呃,有时候可能这种方法就是还不太好用的话,那可能我们更多的,嗯,你就干脆直接把它转换成流,在这儿直接定义出来不就完事了吗?这个万无一失对吧,这个就肯定没有问题,然后除此之外呢,另外还有一种方式,之前我们不是说过这个还有DDL里边,呃,创建表的那种方式嘛,对吧?呃,连接外部系统的那种方式嘛,那在这个里边怎么样去定义呢?也可以直接去定义,大家看就是create create table的时候,这里边不是有对应的这个字段吗?我在后边直接加一个字段,比方说加一个。
18:59
PT对吧?然后后边pts注意调这个函数pro time这个表达式,就是说我追加一个PT这个这个字段,它的类型是什么呢?类型就是proty对吧?我调用本身底层的这个函数,告诉告诉当前的这个表里边这个字段,它就是处理时间的那个时间特性字段,这里大家要注意啊,这个as pro time,这其实是一个什么呢?这是就是blink里边引入的一个新的概念,叫做计算列的概念,就是说可以在表里边我们直接产生出数据源本身里边有的那个STEM stemma里边没有的数据,没有不存在的列,我可以把它计算出来,然后添加在当前的STEM里边啊,所以你如果要用这种方式的话,那就必须得用这个blink planner了,对吧,你如果要执行这个DDL的话,就必须得用那个blink啊啊。
19:59
大家看后面我们是这个file system对吧,Type是file system,然后我刚才的这个format type是这个CSV啊,大家下去可以试试这种方式啊,这个也是可以把当前的这个时间字段定义出来的啊,这是关于处理时间的定义。
我来说两句