00:00
接下来我们就开启新的一段旅程,我们开始讲这个flink的高级API table a和flink CQ,这一部分呢,呃,其实主要就是分成两部分对吧?一部分是table API和另外一部分呢,就是flink CQ,但是我们往往会把它们放在一起来说,呃,这两种API其实在flink里边是非常非常紧密的啊,我们一般认为他们底层是一样的,所以说一般会认为它们是一体的,也就是调用方式的话可能会有所不同,但是整体的理解思路都是一样的。那接下来我们就来看看这个table API和flink CQ到底是什么。啊,那首先呢,弗林可本身大家知道是批流统一的一个处理框架啊,那所以说对于这个处理操作而言啊,在当时我们讲到这个三层API这个结构嘛,啊,那底层的话,那是其实是process方式,然后上面是区分了这个data stream和data set对吧?啊那对于我们处理而言,有时候就会觉得其实就是中层的这个API啊,整体来讲调用还是比较方便的,但是呢,有时候我们这个data stream是一套API data set,我们做批处理的时候又是一套API,这感觉还是有点有点不舒服,或者说有些操作呢,跟大家熟悉的那个习惯还是不一样啊,它是流处理嘛,我们大家更熟悉的还是表的操作啊,甚至于大家想直接写CQ,对吧,这种方式是作为一个大数据程序员,可能是最为熟悉、最为舒服的操作方式。
01:37
那flink呢,也是在这方面给大家提供了便捷的调用啊,那这就是所谓的统一的上层API table API和flink CQ,那这两部分呢,又稍有不同,那table API它是一套内嵌在Java和skyla语言当中的这个查询API啊,所以说在这个调用的过程当中呢,大家会发现它其实跟我们那个做流式调用的时候有点像,它也就是基于我们定义一个table这样的一个类,对吧?呃,就是这样的一个类型,然后呢,基于这样的一个table就可以后面点点点啊,就是直接往后面去这个点什么点什么链式调用它的一些处理方法就可以了啊,你可以去点select,可以点CQ对吧,呃,可以这个呃,做各种各样我们在表操作里边能做的这些,呃,所有的处理都是可以的。
02:30
啊,这是table API啊那。关于这个,呃,这个flink CQ呢,它的特点就是说我们直接可以写一个字符串里边就是CQ对吧,就是完整的一句CQ,那那这个东西直接用别的方法直接把它执行起来,诶,然后直接就可以把它当成一个那个呃,Flink的流失处理程序来跑啊,啊这种方式就是大家可能会加直观啊,更加这个方便一些,Flink的CQ呢,它的支持是基于这个阿帕奇k set的啊,因为大家知道这个KET是实现了这个C口标准的嘛,所以基本上。
03:09
CQ标准CQ里边支持的一些语法,一些函数,一些方方法,这里边flink CQ都是支持的,所以说整体来讲,现在flink CQ的知这个能够做的事情啊,能够调用的方法比table API。呃,还是要稍微多一些的啊,就是它的这个现在能用的这种呃功能更多,但是呢,现在整体来讲,这个tableable API,无论是tableable API还是fli CQ,现在还都不是特别的完善啊,就是之前大家知道就是在呃1.9之前啊,呃,那个阿里还没有开放内部的blink版本,那当时其实就功能更少啊,各个大厂其实都是有对于这一套东西是有不同的实现的,自己那个呃定义的那个函数方法对吧,各式各样啊,这个都不统一,1.9之后现在的功能渐渐变得可用了啊,阿里把这个公开了,大家都用它的标准,呃,但是呢,里边这个呃没有实现的地方还很多,就是不能实现的功能也还很多,它还在快速的发展完善,这个大家在官网上也能看到,这个说法啊,呃,大家看这里边有一个黑体字,这是这个table API和CQ的vie啊,首页这里边有一句话说,注意please note。
04:26
给大家简单翻译一下这个英文啊。当前的这个table API和CQ啊啊,Not yet feature complete complete,对吧,什么意思,就是还不是一个功能完善的状态。然后它是are being activity。Develop就是还处于这个非常活跃的开发状态啊,所以这里边呢,就需要注意,呃,这里边就是还有很多这个应用啊,还还是不支持的,对吧,所有的这个操作方法,有些是现在还还不支持,所以说呃,这个大家就是还是要对flink呃有一点这个信心,也是要有一点耐心,以后的版本肯定这个功能会越来越完善啊,当然以后的版本可能调用的方式又会有所不同,大家要时刻保持一个开放的心态啊,我们这里是就现在的版本啊,一点十的版本给大家做一个介绍。
05:23
呃,那呃,接下来我们看一看这个就是table API和link CQ,这个到底在代码里边怎么样来写吧,我们来先写一个简单的实现啊,看看它到底是一个什么样子啊,我们这个就直接在代码里边新建一个,新建一个包吧。呃,我们直接创建object加上包名。这一部分我们叫做这个table test。Table test,呃,先写一个非常简单的事例啊。然后在这个代码里边,我们还是先把这个main函数写出来,呃,然后要写这个flink table啊,Table API或者是flink c的代码,呃,这里边大家需要注意一个一个点,就是说我这里边必须要引入新的依赖,就是在开发环境里边必须要引入依赖啊,引入一个。
06:19
我们可以先看一下之前的这个。这里边应该是有这个POM依赖的啊,大家看这里边需要引入什么呢?引入一个flink table planner和一个flink table apila bridge。诶,这两个这个用途主要是干什么玩意儿呢?这个planner大家知道啊,字面翻译就是一个计化器对吧?啊,这是这个flink table的一个核心组件,核心部分,也就是说我们这个table API它的一个,呃,我们需要根据所写的这个CQ,或者说table的这个链式调用方法生成一个执行计划啊,就是对于这个类似于我们CQ里边的那个执行计划,对吧?然后呢,我们接下来要能把这样的一个执行计划翻译成流处理程序啊,或者说批处理程序里边的这种,这种就是最后的那个执行图,对吧?啊,所以它相当于是给我们提供运行时环境啊,生成执行计划啊,这个最核心的部分,这个就叫planner。
07:20
那这里边我们引入的这个planner,这是什么呢?呃,这是当前这个是老版本的planner啊,就是呃,本来这个就是1.9之前就是叫叫这个版本啊,现在还同时提供现在一点十版本了,也还有直接这个flink table planner这个东西,那还有既然是有这个管它叫老版本啊,那就还有新版本,什么是新版本呢?看这官网大家看到啊。这里边还有一个新版本叫做link table planner blink,对吧?这个blink后面加一个就planner后面加一个blink,这就是当前这个blink代码里边给我们提供的可用的一个blink的plan啊,就相当于大家可以理解成这个就是引擎不一样了,对吧?我可以换个引擎啊,就是在做这个后面我们提供运行时环境和生成执行计划的时候,呃,可以换这个planner。
08:14
那一般情况现在啊,基本的一些功能啊,现在在老版本的这个planner里边也都已经提供,也都已经实现了,但是有一些比较特殊的功能呢,那就必须得用blink版本啊,就现在这个官方比较推荐的还是先使用这个,就是早期的这个老版本,因为blink的这个版本呢,还不是大家看这里边这一句啊。呃,如果你要是这个生产使用的话,生产环境的话,还是比较推荐recommenddy的old planner对吧?呃,就是在1.9池之前的那个老版本的这个planner,现在的使用更加稳定一些啊,Blink里边是功能更强大更丰富,但是现在不够稳定啊,所以这个还是还是会不停的发展变化啊,还是那句话啊,大家就是随时调整,持一个开放的态度。
09:07
呃,然后大家看这里边还有一个东西叫这个skyla bridge对吧?这又是干什么呢?啊,这其实bridge是一个桥接器嘛啊,它主要就是说我们这里边不是有那个table API吗?呃,那我们之前底层是那个data stream API啊,那他们之间是需要有一个关联的,然后呢?呃,他们俩之间的这个连接和转换又按照语言分成Java版本和skyla版本,所以大家看官网的话,这里边有这个Java bridge和skyla bridge对吧?呃,就是这两个,这就是你要用什么语言去开发的话,呃,加入什么样的这个支持就可以了。其实这里边大家看到这个组件写的很多啊,还有common对吧,还有这个table API Java table table API,其实这里边啊,平常要使用的时候,直接引入这两个就够了,而且甚至怎么样呢?啊,就是我们直接引入这个planner一个就够了啊,我们这里边给大家引入先看一下啊,在pop文件里边把这个。
10:07
Planner先引入进来。然后把这个引入之后,大家其实在这个maven项目的这个依赖里边会发现啊,就是它其实那个bridge。调节器其实在这个里边包含了,我们点开看一下啊啊,大家看这里边有common对吧,也有,甚至这个Java bridge和sky bridge都已经提供了啊,所以这里面其实我们没有必要专门再把那个引入啊,大家如果要是呃官方推荐的方式还是把这两个同时都引入啊,这个他可能是考虑到有一些版本可能之前没有没有带着那个bridge对吧,这个就是大家大概知道就可以啊呃,那另外还有就是在生产环境里边,大家可能会想到生产环境里边我需要再单独提供这些东西吗?啊,现在的这个一点十版本其实已经不需要了。我们看一下这个flink下边啊,我如果进入到这个library啊,Lib目录下边的话,看一下当前它其实就已经有了flink table的这个抓包和flink table blink的这个抓包,对吧,这两个支持就都已经有了,只要在现在的版本,就是大家直接一解压,下面默认就有这个啊,不用你再去下载,所以说我们在这个呃运行就是在在我们这个生产环境里边啊,大家提交这个table API和flink SQL代码的时候,其实没有什么问题的,没有别的依赖,需要专门引入直接执行,这些都有了。
11:34
啊,所以这个其实还是比较简单的啊,整体来讲还是比较简单的。呃,所以接下来我们就看看这个具体代码到底该怎么写吧,啊,关于这个就是两种planner啊,就是老版本的和blink的这个区别,最大的区别就是blink。的这个版本真正的实现了这个批流统一,就是老版本的这个table API,它的这个planner啊,它其实是就是一套方案,是把这个批处理程序,然后转换成data stream程序,然后呢,呃呃,就是流流处理啊,流处理程序转换成data stream程序,而批处理是要转换成data set对应的那一套程序,然后来调底层的这个,呃,Data stream data set这个API,然后去去做处理,对吧?啊,那对于这个blink而言呢,它就完全批流统一了,就连底层不光是上层我们的调用统一,这个调用它这个底层都都是这个,就是中间我们那一层都是批流统一的,它就是所有的代码都转换过来,是流处理程序,Data stream程序,这就没有那个转换成data set的那个过程了,而且就是它还提供了更多的这个,呃,更多新的这个方法,对吧?啊,它支持了一些其他更多的这个应用啊,这个就是大家大概知道是。
12:52
这样的一个情况就可以了,然后接下来我们来写这个简单的example这个事例啊呃,在这里边首先我们给大家写这个事例呢,还是基于流,先创建,然后把流转换成一个表,大家看看这个表的操作,这个是大家可能呃最能接受或者说最简单的一种方式啊呃,那所以前面这个流的话,该配的环境还是一样,我这个就直接抄过来吧,大家知道这个都是一样的嘛,对吧?呃,为了这个转换方便,我还是把这个下划线后面这个影视转换先引入对吧?啊,然后接下来我还是那个读取数据转换成样例类啊这个。
13:30
我我直接从文件读取吧,把这个map成样例类转换过来啊。这个sensor reading,把这个引入我这里把这个直接打开。然后啊,我这个这个流失处理这里边先先把这个删掉啊,呃,然后接下来或者大家如果要是想用这个流逝的输入来测试的话,其实也是一样,我们这个就给大家先看一看这个效果啊,因为从文件读取这只是一下子,呃全部输出而已嘛,然后我们这里边map成样粒类之后,现在还需要干什么呢?这里大家注意啊,前面是我们把这个流。
14:09
都已经创建好,然后呃做好了,现在我们如果要做这个表的操作的话,诶,这里边首先要基我们写一个啊,基于env要去创建一个表的执行环境,对吧?就像我们前面你做这个流式处理的时候,我要先去创建一个stream execution environment一样,现在呢,我要做那个表的操作,就先要创建一个table environment啊,所以有点像大家之前那个我们讲那个SPA streaming的时候,对吧?啊你你基于那个SC,基于Spark contact去创建这个,呃,这个SPA streaming的context这个这个其实是类似的这样的一个状态啊好,那所以我们这里边就是创建这个表环境对吧。创建的过程其实也非常简单,我定义一个table env,呃,那么它是一个什么呢?我这里边定义的是一个stream table environment,对吧?大家看是这个东西,所以我的调用方式是stream table environment,大家看它没有那个get方法,但是有一个create方法,所以这里边可以create,里边传什么呢?可以直接传之前的那个流逝的执行环境啊,所以直接把这个env传进去完事。
15:27
呃,这非常简单啊,然后接下来有了这个表执行环境之后,就可以基于基于它去创建表了,那我们当前的创建表的方式是什么呢?因为我们已经有流了嘛,对吧,所以我们可以。基于这个table env。烟啊。将硫转换成表,哎,所以大家可以看一下我们这个转换过程也是非常简单,之前我那个叫data stream,现在我叫data table,它是什么类型呢?就是一个table类型,这个要引入一下啊,大家注意这个我们用flink table api.table把它引入啊,然后这个我就直接table in啊点诶这里边有一个方法叫做from data stream啊,就是从一个流来做转换,转换成什么呢?呃,当然是要转换成一个table对吧?啊,这个大家如果点进去会发现它得到的这个结果,当然就最后是返回了一个table类型,对吧?啊,那么这里边这个table就是一个特殊的一个定义了啊,这是一个Java接口,大家看到啊。
16:38
这是一个Java接口啊,所以说我们这里边返回的是什么呢?当然就是一个,呃,实现了这个Java接口的一个具体的一个tableable类对吧?啊,就是一个表的一个类,那这里边传的参数就是一个具体的data stream了啊,大家注意这个table这里啊,这个接口是没有没有那个泛型的,没有类型参数的啊,大家之前那个data stream的时候,我们定义的时候还应该后面得有类型对吧?中括号括起来,这里边没有。
17:06
所以这里边就是直接。这么写就可以了,里边把这个data stream传进来,诶就这么简单转换成表。所以现在我们就得到了一个table,所谓的table API是什么呢?就是基于table去调用的一整套方法,对吧,内嵌在Java或者SKY语言里边啊,那所以我们这接下来就相当于是调这个Java方法一样啊,这个表里边定义好的方法就是table类定义好的方法啊,现在我们看啊。调用,呃,我这里边啊,给大家这里边写上这个步骤吧,前面是我们那个。这个我们叫第零步啊,因为这是。跟这个表没关系的一些东西啊,这个叫呃创建创建流执行环境,读取数据并转换成样例类。
18:05
类型啊,这是我们前面做的这个,就是相当于一些一些预处理的步骤吧,然后接下来做表操作的时候,是先基于env创建一个表的环境,对吧,然后呢,基于这个table env,我们定义的这个表环境,把流转换成表。然后接下来那就是调用。Table API。做转换操作。啊,那当然转换操作之前我们所有的那个,呃,转换操作啊,Transform基于data stream做转换操作得到的,你兜兜转转啊,绕来绕去,最后还是data stream对吧?那所以这里边如果我们做table API的转换操作的话,那是不是也是兜兜转转绕来绕去还是table啊,所以这里边我们得到一个result table啊,它同样还是一个table类型。
19:00
啊,然后这里边就可以基于data table去做操作了,比方说做什么?呃,我们这个大家知道,这里边这个流里边我们的字段怎么定义的呢?Sensor reading吧,有一个ID,有一个时间time Sam,还有一个温度值temperature,对吧,比方说我就做一个非常简单的操作,我想呃,就是提取出来,这个只只提取啊map成一个二元组,只要这个ID和temperature。然后呢,我还只要那个SENS1对应的那个,呃,所有的数据,那就相当于有一个map,有一个filter,那这里边如果我们用这个表操作怎么做呢?啊,这里边就是直接s select。大家看这不就是我们熟悉的这个,呃,对于表的这个查询操作吗?呃,Select里边可以传这个字符串啊,所以这里边大家看我直接给可以给一组啊ID temperature,诶直接写进去select这两个字段,然后可以怎么样呢?呃,Filter对吧,Filter这里边还是一样,我可以给一个ID等于啊这里双这个要把里边这个也要引起来对吧,字符串类型还要引起来三一。
20:10
哎,这就是我们做了这样的一个非常简单的操作。啊啊,那如果说这个最后要打印输出的话,呃,大家可能会想到,诶,那table能不能直接打印呢?呃,不可以不能直接打印,那怎么样可以直接在控制台看到这个输出呢?我再把它转成流就可以了,对吧,所以我这里边啊。最后一步。我们把。表转换成流呃,打印输出。呃,那所以这里边我们再定义一个result stream对吧?啊,那这就是一个data stream了,这个data stream按照你刚才的这个定义,因为它有类型的嘛,按照我们这个定义,Table如果要做转换的话,转换出来是什么类型呢?哎,应该是一个二元组类型对吧?一个ID,一个temperature,那里边的类型当然ID是string temperature就是double。
21:10
诶,这样把它定义好,然后它就应该等于我们这里边把这个result table做一个转换,怎么样做转换呢?哎,这里边我可以调一个方法,调的这个方法需要去引入一个影食转换,又又要有了什么饮食转换呢?大家看又是把这个啊执行环境在的这个包啊,把这个下划线引入,就是之前我们是那个flink streampi GALA,然后下划线,对吧,现在又要引入一个,呃,Flink table apis GALA下划线把这个引入,然后大家看到就可以调用一些影视转换的方法,有一个叫做to aend string。哎,这样的话,我就可以把它转换成一个,这个是一个追加流对吧?呃,因为这里边就涉及到这个表跟流的一个一个转换啊,这个深层的含义我们后面再给大家讲啊,那这里边的这个追加流大家会想到这就相当于什么,我因为本身这个流里边的数据是源源不断的嘛,那所以这个表又是怎么样的呢?本来它是从流里边转换过来的。
22:12
所以我们前面流如果来了一个数据,这表里边相当于就多了一条数据对吧,那就本来应该是在后边,就是这个表不断增长的,所以后边呢,我们把它转换成一个追加流,那就相当于就把每一条数据再追加的那条数据再输出到这个流里边,做一个输出就完事了啊,所以这个其实是比较直观叭,较简单的一个表和流的转换操作。啊,那这里边拓stream,我们也可以把这个它的类型定义出来啊。String double对吧,哎,这就是这个整体的这个定义,后边我们可以把它做一个。打印输出result stream print来,这就是这个过程,最后大家不要忘记执行对吧?Execute当前是table API。
23:02
Example。Job。这里面我们运行一下,大家看看这个结果跟我们想的输出这样一个二元组啊,是不是一样,我们先跑一下试试啊。好,大家看到现在已经运行起来了,哎,果然就输出了每一个,我们提取出来这个二元组对吧,一个ID,一个temperature,我们要的不是只有三一的数据吗?所以就只输出了三一的三条数据,别的数据什么三六三七三十没有输出。诶,所以这个非常符合我们的预期啊,这个结果跟我们直接做那个流处理的转换是一样的,没什么区别啊,这里边大家可能会发现,就是我们在做这个table转换的时候,居然对这个table表的结构一点都没有定义对吧?啊,根本就没有定义ski码,它它这里边直接就可以选它这个字段提他这个字段了,那这个是到底是哪里定义的,从哪里来的呢?啊,这个原因就在于我们的data stream本身里边是样例类类型对吧?这个样例类类型里边相当于就已经有了现成的STEM嘛,所以这里边我们转换的时候直接from data stream转换过来什么都不要定,定义直接掉就完事了。
24:17
这是最简单的一个用法啊,呃,然后可能有同学觉得这个就是使用起来不是特别的舒服,但这个就是说,呃,看使用场景啊,我们有些场景下可能觉得这样会更好,那或者更多同学可能会觉得,诶,你既然是这个table API和flink嘛。那能不能呃,就是我直接写一句CQ把这个搞定呢?我们更喜欢这种方式,哎,这里面给大家举一下这个直接写CQ的例子,所以这里边这个3.1对吧,我们这里边其实是第三步。第三步是转换操作。就要得到结果对吧,呃,得到。
25:01
提取结果啊,那首先3.1,我们是调用这个table API,然后我们在同样的啊,用它等同的这个这个方方式啊,结果一样的这个方式调用一下link CQ啊写。呃,CQ实现转换。我们看看这个CQ怎么写,呃,CQ这里边写的话就稍微麻烦一点,因为呃也也不是说稍微麻烦一点啊,就是说前面大家需要考虑的要多一个步骤,为什么呢?因为之前我们拿到的是一个table对吧?Table API就是基于这个类去做的操作,那你如果要直接写CQ的话,那flink环境里边,他怎么知道你这里边是一个Java类呀,他怎么知道这个我当前有这样的一张表呢?呃,所以这里边得多一步操作,就是我还得先去呃当前的表环境里边注册一张表啊,或者说创建一个表,对吧?啊,这里边给大家看一下这个注册表的方法啊,那大家也能想到有一个方法应该叫register table啊,但是这个方法大家看被弃用了,就是这个是之前这个,呃,老版本里边的写法啊,就注册,直接注册一张表,大家看就可以把给一个name,给一个表名,然后把我们当前的一个table注册进来。
26:19
然后就可以写CQ调要用了啊,但是现在不推荐用这种方式了,推荐用什么呢?叫create temporary temporary temporary view对吧,创建一个临时视图啊,这个就看大家喜欢用什么方式啊,我们后面再再详细说,这里边大家如果觉得这个简单的话,我们就用这个弃用的方法啊,直接注册一张表就算了啊,那后面我们就可以注册一个,比方说我这个就叫。呃,就就叫data table对吧。这个随便叫什么都可以啊,然后把前面的这个data table传进来,这个名称是没没关系的,你随便给,那接下来就可以直接写CQ了,比方说写CQ,我最后可以得到一个这个,呃,还是可以得到一个table,我写一个result CQ table啊啊,那这里边还是类型是table。
27:08
怎么转换呢?注意这个调用CQ的话,就得用环境的table table env表,环境的一个方法叫CQ query对吧?我们当前写的是查询嘛,直接写一个CQ query啊,那这里边怎么写啊,那那就非常简单,前面这不就相当于是select这两个字段对吧?然后where ID等于341吗?啊,这简直太简单了啊,然后现在这个表也有了嘛,之前我们比较难受的就是不知道该from哪张表,现在这个表已经环境里面有了,所以接下来啊,或者大家觉得直接在后面写太长的话,我直接可以写这个就是三引号啊,三引号这样的方式,直接把它这个空开,这么写select,呃,这个ID。Temperature。Temperature对吧,然后from from data table。
28:03
Where?ID等于SENSOR1对吧?哎,直接这么一写就完事了。呃,这里边我们可以把这个也做一个打印输出,我现在不要用这个,呃呃,这个。Result result stream stream还是一样定义啊,只不过这里边我用result c table做一个输出,看一看它得到的结果是不是一样就可以了。跑一下。好,我们现在大家看一下这个运行的结果跟刚才一模一样啊,所以说我们用这个table API,或者说直接写一条CQ这个方式都是可以实现这个功能的啊,那如果大家觉得这个直接写CQ的方式更舒服,更符合大家的这个习惯的话,那以后也可以用这种方式去做。啊,这是我们先有一个整体的认识。
我来说两句