00:00
然后接下来我们就准备给大家讲这个另外一部分flink里边更加高层级的API,就是传说中的table API和flink CQ啊,那这一部分其实涵盖的内容还是比较多的,那我们的任务呢,主要就是还是以一个就是应用的层级,因为它是比较高层级的API嘛,我们主要是考察它的应用方式,呃,然后对于这一部分API大家需要去做一个整体了解,就是我们可以先看到在官网上的一个介绍,呃,大家会看到啊,这是这个table API和弗Li CQ的一个基本介绍,这里面大家看到黑体的有一行字啊,这个说什么呢?就是你需要注意一下当前的table API和CQ,还不是not yet feature complete complete,也就是还不是功能完善的一个版本,对吧,还在非常活跃的开发过程当中。
01:00
啊,所以大家要注意一下,就是现在的这一个tableable API和flink CQ呢,其实还是一个在不停的更新调整啊,大家还是要怀着一个比较开放的心态啊,当然就是最近的几个版本,其实更新的都非常多啊,变化也非常大,我们只是给大家介绍一下原理,然后让大家知道实际怎么样使用,这个其实就达到目的了,呃,在这个以后的工作当中,可能更多的大家的使用还是要直接上手去写CQ的,所以这部分还是需要有一个了解啊,那首先我们来看一下这个一个基本的概念啊,就什么叫做tableable API和弗link CQ啊,那我们说这个之前我们也已经知道了啊,就是flink API里边给我们提供的有三层API,分层API,这一层呢,这就是最顶层的API了。那flink table和。呃弗link CQ啊,这两个其实API关系非常的密切,我们可以认为它们就是一体的,就是在去调用的时候,他们的效果也基本上是完全等同啊,所以我们会放在一起来给大家讲,给大家说,呃,那flink里边呢,本身是一个批流统一的处理系统,对吧?我们说它的底层是把所有的数据都当就是数据流啊,都当成流来处理的,P认为就是一个特殊的流,一个有界流,所以说呢,为它的这个上层处理呢,也提供了统一的这个上层的API,那这里边有两种不同的这个API啊,就是一个是table API啊,就是我们看这个table API,它的特点就是内嵌在Java和skyla语言里边做这个查询转换操作,所以说呃,它的特点呢,我们看起来就是非常像skyla语言里边的某种集合类型的这种链式调用的转换一样啊,它。
02:54
可以以非常直观的方式来组合一些,就是我们在CQL里面操作的一些关系型运算符,比方说这个select对吧?啊,比方说我们这个,呃,做做一个filter啊,做一个其他的这些开窗的操作啊,都可以以这个类似于C的语法去用这个table的方式去做转换操作,这是table API,然后另外呢,还有一个就是非常直观,也是更加大家可能更加熟悉的方式,就是flink CQ,呃就是它真正就是你去写CQ去执行CQ了,它的底层呢是实就是基于实现了CQ标准的阿帕奇t set对吧,那里边就是实现了这个CQ标准,那么flink基于它就可以解析CQ,直接去执行CQ了啊那我们再回顾一下这个三层API,对吧,现在是一个最顶层的这样的一个状态,那当然了,这在发展的过程当中,早期的时候啊,Table API和link CQ,其实它的功能是非常非常少的,就是我们。
03:54
说在这个,呃,1.1.81.7之前啊,呃,基本上里边就是能做的操作是比较少的,呃,提供的这个函数这些方法也非常的少,那一般情况下大厂呢,都是要基于底层API自己去做二次开发,对吧?那没有统一的,那大家各玩各,这个就是应用起来就不是很方便了,那真正的变化是从什么时候开始呢?就是从flink1.9开始,我们都知道阿里对吧,这个国内的巨头在flink社区开始发力,阿里把自己内部的这个tableable API和fli SQ的实现版本blink开源贡献出来啊,那当时这个1.9的版本,呃,当时据说是有这个接近150万行的代码更新啊,那主要都是集中在就是blink这一部分,那所以说接下来我们大家统一去用的话,就有了一个统一的标准了,而且功能也越来越多,越来越完善丰富了,啊,当然就是说现在这个。
04:54
还在不停的变化,对吧,但是我们相信这是呃未来的一个趋势啊,就是会越来越多,越来越丰富,我们可能直接调用它这个直接写一条CQ就可以把很多需求都搞定了啊,但是并不是说我们底层的API就不重要,大家对于理解底层的原理以及有一些需求,我们如果用这个CQ搞不定,那你直接用底层的这个API还是比较有必要的,好啊,那接下来我们就来给大家看一看在代码里边怎么样去写这样的一个table API和弗林CQ的程序。
05:31
我们直接在代码里边啊,新建一个。API test下边直接创建一个object,呃,然后我们这里边,因为这是另外的一类嘛,我们新建一个这个table test这样的一个包啊,然后下边我们给大家举一个简单事例,就叫做example吧。好放在下边,然后一个幂函数啊,首先前面的一些处理流程,我先把之前的这个处理的过程都引入,直接抄过来对吧?啊,这里面东西太多了,我直接用这个吧。
06:11
好,我可以直接呃截取到当前的做转换处理,转换成卖成样一类这样的一个呃,一个状态,对吧,先把这个先引入,然后这里边我这个干脆也不要从这个soet的文本流里面去读取吧,我直接用那个文件去读取吧,这个稍微简单一点啊,大家等一下看一下这个测试的效果就可以了。把这个东西引入。把当前的这一个做一个对齐啊。好,呃,然后大家不要忘记,就是前面该引入的影视转换要引入对吧,后边要做的这个执行要写进来,然后这里边我们是tableable API example一个简单的事例,大家先感受一下这东西到底该怎么用啊。呃,然后前面我们该做的这些操作还是啊,先用这个流的方式先把它读取出来了,然后接下来呢,诶,我们就会想到,那你如果想要去调用这个table API,或者说这个呃,Flink CQ,写这个flink CQ的话,那你直接去写能能用吗?其实不能的,你或者说你你有想到TABLE1篇,那应该有个table啊,有有这么一个类吗?哎,你这里边如果要去引的话,当然这里边就是有对应的这个类型可以去调,对吧?但是你看到这里边都是些什么呀?ES里边给我们定义好的,对吧,或者说skyla兔子里边定义好的,或者说这个,呃,引入的这个,你看呃,Google的这个瓜网。
07:51
这个这个包对吧,或者是其他的这些shad的这个包啊,那所以这里边并没有flink给我们实现的底层的这个table API里面的那个table,所以当然我们是要引入相关依赖,对吧?啊,这里边需要引入的依赖,依赖大家也可以看一下这个文档里边的介绍,文档里边需要引入的依赖是什么呢?就是主要是两个,一个是flink table planner,然后后面是SKY的版本,对吧,下边是这个flink的版本,也就是当前这个planet的版本要跟flink版本一致,另外还有一个是flink table API scalela bridge啊,那对于这个bridge大家知道是调接器嘛,对吧,调接器就是一个语言的一个转换的一个工具,对吧?那对于这个Java和skyla呢,对应的它主要就是提供我们这个table API和底层的data stream或者data set API做转换操作的时候提供一个连接支持,那所以。
08:51
会有Java和SKYLA2种版本,你想用什么样的bridge连接器就就引入什么样的版本就可以了,同样后边skyla版本2.12啊,这里边我们的这个就是弗link的版本是一点十点一啊,这里边给大家说一下,就是这个planner是什么呢?Planner是计化器对吧?啊,计化器的意思其实要就是要干什么呢?这是table API里边最主要的部分,它会给我们提供这个做table API或者弗Li CQ运行的时候的一个运行时环境,然后呢,它会基于这个运行时环境解析我们当前的流失处理程序,然后生成一个表的执行计划啊,因为大家知道在关型数据库里边,你去呃写CPU对吧,查表做操作,其实也都是要生成一个执行计划,然后去做执行的嘛,所以这里边的这个planner主要就是做这个工作,这是我们的核心啊,所以呃,官网上面给大家的介绍是引入这两。
09:51
个包,其实呢,在直接引入的时候,直接给这个planner就可以啊,就我们可以先把它引入大家看一眼啊,直接放在这个文件里边。
10:04
直接在这个后边放进来,然后我们看一下。刷新一下。哦,大家看到这里面已经有了,对吧?这里面有这个flink table planner2.12,呃点呃,这个1.4.0的版本,然后大家看到里边其实它自己就有这个Java的bridge和SKY的bridge对吧,自己就带着的啊,所以说你这个就是其实不赢也可以,如果要引入的话,你还得注意那个版本千万不要有冲突啊,这里边是这个平常的这种使用,然后对于在集群环境里边,那你要有这样的一个依赖,怎么样去做做这个操作呢?哎,那其实我们可以看一眼当前这个集群环境啊。大家还记得在这个集群环境里边,我们在做看这个目录的时候,有一个目录叫做library对吧?Library,呃,这里边大家看到有一个包就叫做flink table2.12,一点十点一对吧?哎,所以说当前的这个其实就是我们主要当前这个table API的planner里边的东西都已经有了,所以我们已经有了这个包,那就不需要再加东西了,呃,我们下载下来安装的时候,这个就在里边,不需要单独下载,所以说呃,不需要任何的额外操作啊,然后大家其实也会发现了,这里边还有另外一个东西叫做flink table blink2.12,诶,那这又是什么含义呢?呃,这里边要解释一下,就是当前啊,呃,就是老版本的这个,就是以前我们用到这个table API的这个计化器,跟blink版本里边的计化器是不一样的,所以说这里边就会有一个区分,一个叫做old planner,一个叫做blink planner,那所以呢,你如果要。
11:52
套用blink planner的话,其实还得专门引入一个blink的依赖,Blink的这个支持的包啊,那这个引入的时候其实也非常的简单,在这个po文件里边怎么引入呢?啊,就是它的这个版本就是直接这样,就是后边给一个这个planner blink把这个引入就完事了,然后你会看到在当前我们的这个po文件里边,这个就有了一个table planner blink,对吧,这里边它基本上需要的这些组件基本上都差不多,你看这个Java跟sky bridge的这个这个依赖也都有,对吧,这这里边重复了,它就直接去掉了啊,然后它还有一些其他的一些东西,它的实现的功能要要多多很多啊,所以说呃,之前是这个,呃,Blink本身它的这个planner呢,不是特别的完善,所以说官方官网上推荐的时候,推荐大家使用还是要去用这个old planner的啊,就是有些特别的。
12:52
的功能应用的时候,呃,那你就只能用这个blink,那就没办法,那那你就去用这个了,然后从刚刚新出的这个一点十一开始,官网上推荐大家是直接用那个blink的planner,当然现在还是有两种planner的提供选择,对吧,你这里边两种想用的话都还可以用,但是现在就是blink完善度更高了啊,大家可以紧接着就是呃跟跟进当前这个弗link c整个的发展的过程啊,看看这个东西到底该怎么用。
13:22
好,把这个东西一一引入之后,现在如果我们再来看这个table的话,诶,你就会看到多了一个选择,就可以选这个flink table api.table了,对吧?啊flink table下面的这个这个包了,所以当然这里边我们想要的就是它啊,接下来我们就是基于它去做这个table API的一些调用啊,那这里边我简单的给大家写一写,看看这个东西到到底该怎么用啊,我先把这个删掉,哎,那如果说想要去调用这个table API的话,首先之前我们不是说这个planner给我们提供了一个这个表的执行环境嘛,对吧?所以首先我们要创建表执行环境,哎,那这里边这个表执行环境啊,跟我们前面的这个流式处理的这个stream,呃,Execution environment,其实差不多的啊,只不过呢,它是要基于我们之前的,因为它底层是流处理嘛,基于之前的stream。
14:23
Environment来创建当前的这个表环境,那这个表环境它是个什么呢?它是个stream,它叫table,我们现在都都是流处理嘛,Stream table,大家看到environment,对吧?然后这里边它的方法,它不是get,有一个方法叫做create,然后create里边大家看到了要传的参数,是不是可以直接传一个stream execution environment呀,对吧,就把前面的env传进来啊,所以这个就有点像什么呀,就有点像我们这个在呃,Spark做操作的时候,我们先创建一个SC对吧,Spark的呃,Context,呃,Context的,然后接下来基于它再去创建Spark streaming的那个context,对吧?呃,就是一层一层的这种包装的环境的创建是可以做这样的操作,可以做一个类比啊,先把它创建出来,然后接下来呢,诶,我们基于流创建一张表啊,那接下来我们看看前面我们不是已经有这个流。
15:23
那么data stream了嘛,那接下来我们来定义一个data table data table当然就是我们已经想要的这个table类型了,对吧?大家看看怎么做操作,基于tableable env环境,然后呢,它有一个方法,大家看有一个from,然后有一个from data stream,对吧?这个from是个string,我们当然不知道这个string这表示什么含义了,我们现在用的就是from data stream对吧?直接从当前的这个data stream里边把它做一个转换,直接转换成一个表,哎,你看这里边的这个方法啊,最后返回的当然就是一个table了,对吧?呃,当然就是大家如果想看这个源码的话,也可以下载下来去看啊,这里边我们主要是知道应用就可以了,然后接下来就可以调用调用APIAPI进行转换,比方说我们做一个非常简单的转换操作啊,我定义一个result table。
16:23
那当然这个肯定得到还是table了,我就不写了啊,就基于当前的这个get table去做转换,大家看这个table就没有没有泛型的定义了啊,因为我们说它就是表而已嘛,里边的基本数据类型应该是什么呢?啊,应该是肉是吧,大家想到应该是都是一行一行的数据了,所以说这里边我们就不去单独定义了,然后这里边定义了这个我们基于this table去做转换,哎,可以做什么样的转换呢?哎,大家看这个能做的操作就多了,对吧?各种各样的这个方法都可以调了,我这里边直接来一个select对吧?比方说我现在就要这个ID和当前的temperature可以不可以呢?诶,当然是可以的,直接把它选取出来对吧?另外我还可以啊,有一个这个filter对吧,就相当于是where,这里边是这个,比方说我要ID啊,等于当前的这个三,诶这里边啊要要等于这个341,比方说啊,那这里边就得。
17:23
用单引号再来引了对吧,因为外边有已经有一个双引号嘛,3ID等于三一,这样把它选取出来。利用这个条件啊,那后边我们就可以去啊,把这个对应的这个table,那那有些同学看到了这个table,你打的话只有print print STEM对吧,只有它的这个结构的一个打印,并没有最终的一个print输出啊,那怎么办呢?诶这里边大家不要慌,可以做转换,这个转换呢,又要引入这个相应的一些影视转换,所以说类似于我们之前这个streaming API scale下面这种引入方法啊,就现在API table API scale下边的这个环境,同样它下边那个包里边,我们把下划线引入,接下来就可以做这样的一些影视转换,比方说我可以转换成什么呢?Toend string,对吧,把它转换成一个这个叫做添加流对吧,追加流转换成这样的一个流,然后接下来里边需要给一个当前的这个类型,那我们提取出来是啥呢?不就是ID和temperature吗?二元组类型对吧?String double放在这。
18:30
哎,这就是转换成一个流了,流的话大家知道就可以print输出了,对吧?哎,直接打印输出就完事了,这个是result,诶就就就就这么简单,这使用就这么简单对吧?啊,大家看这种写法,这就相当于实现了一个什么,这不就是从这张表里边选取这个字段,然后where这个吗?啊啊,那有些同学可能想了,那哎呀这个还是觉得有点不爽,我能不能直接用CQ去实现呢?可以的啊,给大家快速再写一个直接用CQ实现,那用CQ实现的话,这里边稍微有一点麻烦,就是说什么呢?我还得先去把这个表去做,做一个注册,为什么呢?因为当前的环境里边,你看这张表就是从流直接转换过来的,对吧,它是个table数据类型。
19:16
就是这个table数据类型,呃,它并没有在我们当前的表环境的,所谓有一个这个catalog就有一个目录啊,这个里边并没有去把它管理起来,我如果要想写写CQ的话,必须是在那个目录里边有注册的,才可以直接CQ里边去调这个表名,你不能直接用这个,呃,这里边我们的这个数据类型table去当成呃这个CQ要调的那个表对吧?啊,所以这里边我得稍微做一个转换,就是怎么样呢?用table env去create,相当于注册表啊,但是大家看我用的方法是create一个temporary view,就注册一个视图,比方说这个名我就还叫data table,对吧,然后后边我去把这个this table放进来做一个转换,然后接下来呢,在CQ当中我就可以直接用这个名了啊,有这样的一步转换操作,那接下来我看一下啊,比方说我定义一个定义一个CQ啊。
20:14
呃,Value,一个CQ,那大家知道这个就应该是一个string了啊,直接写,写一条CQ,这个我们select,大家想前面那个不就是select ID temperature啊,然后from from哪张表呢?Data table,然后where ID等于,哎,这里边341对吧,不就是这么一张表吗?呃,不就是这么一条CQ吗?然后接下来我们执行一下它,然后我定义一个CP,基于还是基于之前的啊,啊不是基于之前的data table做转换了,大家看啊,接下来执行CQ的话,就不是基于table去做转换了,这是table API对吧,我我就执行这条CQ不就完了吗?哎,所以直接用环境对吧?Cable env有一个方法叫做来看CQ query对吧?直接把这条query的这条CQ传进。
21:14
出来执行就完事了,呃,然后同样我可以用前面的这种方式,把这一个result table也做一个打印输出,这是一个CQ对吧?哎,现在大家可以看一下这个效果到底怎么样啊,看一下执行的结果。大家看现在我们已经输出了运行的结果了,好,大家看现在我们输出的是不是就全都是二元组,都是341的数据,对吧?因为我们filter过了嘛,然后这个result和result CQ里边输出的结果都一样,对吧?因为我们这个写法就是完全等价嘛,一一种写法是调用table API,另外一种是直接写CQ,这就是这一部分上层API的用法,大家先通过这样一个例子有一个整体的印象。
我来说两句