00:00
接下来这一部分我们就开始新的内容,就是传说中flink里的table API和flink CQ,那首先我们来说一下这两个东西到底是什么啊啊,那这一部分其实就是flink对于批处理和流处理提供的一套统一的上层API,大家还记得我们之前讲的这个分层API的这幅图吧?哦,之前我们说data stream API,这是flink里边中间层核心这个层级的一个API系统啊,我们处理的过程当中,其实处理的都是数据流,都是data stream,当然了,如果做批处理的话,对应的应该叫data set,对吧?Data set API,然后所有的底层呢,用到的是process function process function大家知道它的特点就是在里边我可以访问到当前的所有的数据事件啊,然后可以访问到所有的状态,包括时间,我还可以注册定时器,对吧,对于时间可以有非常精细的控制,另外还可以做这个测试输流之类的这个精细操作啊。啊,这是我们前面讲到的这个核心data three API和底层process function API,那现在我们要讲的就是最上层。
01:10
Flink CQ和table API,那么这一部分大家想到既然它是上层API,那它的目标当然就是让大家要觉得更更加的好用,更加的方便啊,那所以说呢,它会把底层的很多概念和原理性的东西都封装起来,都包裹起来,而给大家提供一个统一的调用接口,然后呢,做一些功能性的扩展啊,那大家想这个如果说它已经直接支持CQ的话,CQ里边我们能调的聚合函数就多了,对吧?啊,就很多操作,你直接写一个写一条CQ啊,里边用调它直接,呃,用它现成的那个函数去做一些操作就完事了,不需要我们再去实现啊,比方说自定义这个map function啊,对吧,自己定义reduce function啊,自己开窗啊,呃,不需要做那么复杂了,那这里面大家会发现我们提到的是两个概念,一个叫table API,另外一个是flink CQ,那么它它俩又分别是什么呢?
02:08
简单来说,他俩其实最终处理的事情是一样的,底层原理,底层结构架构也是一样的,但是在使用的时候呢,它其实是两套API,那么table API,它是内嵌在Java和skyla语言里边的一套表的查询API,它叫table API嘛,基于表的,所以我们就会想到这是在这个Java语言或者skyla语言里边就要有这样的一个table对应的类型,然后基于这样的一个数据结构,是不是可以调它的各种方法啊,对吧,就table点什么点什么去做对应的那个转换查询操作就可以了,所以它是允许以一些非常直观的方式组合一些关系运算符的查询。比方说哎,像我们说这个呃,Select什么from什么,呃,然后VR一个条件对吧,那么对于table API来讲,它其实非常直观,就直接点select就可以了,对吧,去去提取一些字段就完事了,那后面我们会给大家去具体的例子啊,那么对应的这个flink s呢,那就是直接在代码里边写一个字符串,写一条CQ,然后直接执行就可以啊,所以它其实底层就是呃,它的这个CQ支持是基于实现了CQ标准的一个一个组件是阿帕奇的k set,就是table API里边是集成了这个组件。
03:34
这就是关于flink CQ和table API,它到底是什么?那接下来呢,我们给大家还是在代码里边做一个基本的,呃,写一个基本的事例啊,大家先看一看,有一个整体的感觉。我们还是在这个Java下边去new一个class,我们这个要带上包名的,当前应该是com点艾特硅谷这个还属于API test,只不过已经属于table这一层了啊,Table API。
04:06
呃,当前我们这个就叫example吧,一个非常简单的一个事例啊,先把它创建出来,首先这个主方法我还是写出来,需要去throw一个exception,那一开始的过程呢啊,为了跟大家之前的。做一个无缝衔接啊,我们还是用大家更加熟悉的这个流的定义吧,啊,先定义出一个流,然后从流里边把它转换成一个表,然后做操作啊,所以首先要做的还是啊,Stream execution environment,我首先把这个执流逝的执行环境先创建出来啊,那同样还是啊,不是一般性,我们把这个。保证最终那个顺序是正确的,顺序是一致的啊,直接用这个全局并行度设成一,那后边接下来就是先读取数据了啊,读取数据这个还是一样,Env read text file,我这里边还是用那个sensor sensor的数据copy一下这个pass。
05:10
呃,我们把当前这个就叫做input stream。后边基于读进来的这个数据啊,它本来是一个this stream string,接下来我们要把它转换成还是做这个基本操作啊,转换成呃,类似于这个Java并的那个po类型对吧?那接下来这个就是input stream点呃,Map对吧,还是做这个基本转换啊,这这里边我不写那个函数类了,我干脆写一个拉姆达表达式得了,对吧?这个也比较简单,好,每一行进来之后,哎,是不是我首先是定义一个string类型的数组啊,把这个对应的fields要先抽取出来,然后每一行是不是做一个split呀。
06:02
当前是逗号分割,把它先分割开,然后接下来是不是return,要得到一个new,一个sensor reading类型,对吧,我们想要那个po类型啊,里边的话是FIELDS0,就是那个ID字符串类型,然后下一下一个是长整型的时间戳,对吧?所以我们你一个long,然后里边FIELDS1后面还有一个double类型的。F2,诶,这里报错啊,这个写错了中括号啊,这就是我们把它做这个包装转换的一个过程还是一样,那得到这个我叫做data stream,前面大家如果看着这个不舒服的话,我还是改成这个data stream类型,对吧?好把这个写入进来,那有了前面的两步之后,接下来我们就可以基于它去做一些表方面的这个转换操作了,哎,那如果我想想要把它转换成表的话,这里边类似的啊,还是要先去定义创建一个环境,创建表的执行环境环境啊。
07:18
呃,这里边大家看一下这个创建的时候怎么创建,我用一个stream table environment对吧,大家看到有这样一个东西。然后接下来create这里面大家就会想到我既然要创建这个表的执行环境,呃,当前我是有这个接口的啊,Stream table environment,但是我当前现在的这个,呃,就是flink里边有对应的表的这个支持吗?哎,这就涉及到另外一个问题,对吧,这里面其实我是需要去引入依赖的。啊,大家想一下,就是当前我这个link底层它提供的API里边,正常情况下应该只有这个data stream API对吧?啊,当然包括底层的process function,那么table API呢,这是另外要去在上层去提供的,所以我们必须要额外引入依赖,那这里边这个依赖需要引入的就是。
08:10
大家看一下主要就是flink table planner和flink table API API,呃,当然这里面我用的是这个skyla,大家知道如果是这个Java版本的话,应该是Java bridge对吧?啊,应该是这样的一个版本啊,就一个planner计化器,另外还有一个bridge,这个可以认为是一个桥接器啊,啊这样的一个东西啊,所以接下来我们要做的事情其实就是引入它,那这个桥接器,其实这个不是很重要,我也可以不引入,我直接要这个planner这个净化器就够了。大家可以看一下,我把这个依赖先引入。同样这个净化器引入的时候,大家看flink table planner后边下划线2.12,这这是当前这个flink sc的版本对吧?然后下面一点十点一,这是当前,呃,就是flink的版本,都是要对应起来啊,我们把这个引入之后,现在有了一个table planner啊。
09:15
呃,然后这里面其实还涉及到另外一个问题,就是大家还记得我们现在的这个版本里边。应该阿里是贡献出了自己内部的那个blink版本,对吧?诶,它也是一个当前这个,呃,就直接可以跟之前的老版本啊,并行的一个table API的一个净化器,一个一个底层的一个架构,所以说这里边我们如果直接引入的时候,这到底是引入的是谁呢。这是老版本的计化器,然后我们看一下,就是在这个净化器里边啊,大家也可以看一眼,是不是这里面就包含了Java和SKYVA2种语言版本的调节器啊,呃,所以大家看这个计化器planner,它其实就是当前我们去做这个table API执行的一个核心,对吧?它可以给我们生成这个执行计划,然后去做这个对应的呃,整个这个呃执行过程的一个解析,然后可以转换成最后我们的那个流处里的data stream里边的每一步任务,每一步操作,这个是核心,那它里边其实本身就包含着Java和sky GALA版本的支持,这个支持就是所谓的调节器bridge。
10:23
啊,所以我们只引入它后面那个不引也可以,呃,那自然我们就想到了这个,如果要引入这个blink版本的话,那应该怎么办呢。这是这个老版本的对吧,如果要引入blink版本的话,我就同样还是这样啊,Flink官方给我们提供的有这个,呃呃,这个flink table planner,另外还有一个是flink table planner blink,大家看直接就在后边加一个连符,然后blink同样还有后面有这个skyla对应的版本,对吧?还有这个当前flink的版本,就是当前计化器的版本。
11:02
只要把这个做一个引入就可以了啊,然后我们看一下就会多出来这样的一个依赖啊,那在使用的过程当中,就那有同学可能想了,那既然他俩都可以引入,那我现在都引入了,那使用的过程当中到底是用谁呢?啊,就是如果说都引入的话,在一点十我们当前这个版本里边默认用的是老版本的净化器,就是因为之前这个blink版本净化器还是呃比较不稳定啊,功能还是不是特别的完善的一个状态啊,那么现在01:11最新的版本的话,它默认已经是在用blink的这个呃,Planner净化器了啊,所以相对来讲它就相对稳定一些了,对吧?啊这个大家感兴趣也可以看一下这个官网的一些说明啊,比方说这里边是我们看到的这个table API和CQ的这个页面。大家看到这个table API和CQ当前官网上的这个说明啊,呃,前面我们就不说了,大家看一下这个黑体字,这个黑体字呃,也也不用详细看啊,大家只要大概的看一看这第一句就可以了啊,当前他说他就是专门要提示一下note对吧,大家要注意当前的这个table API和CQ呢,是not yet feature complete。
12:22
这是什么意思,就是到目前还没有功能上完全完善,对吧,还是一个未完成的一个状态,然后是啊,啊,Being actively developed还在非常活跃非常速的开发过程当中,对吧?啊,所以大家要抱有一个比较开放的心态啊啊就这里边我们是呃,以后的这个版本可能还会出现很大的变化,所以我们这里边是以这个一点十版本为例啊,那后续的话可能还是要大家做一些调整,你看这里面就是如果呃,对于这种应用场景的。
13:00
应用实例啊,Production use cases,我们现在推荐的是直接用blink planner对吧?啊,就是如果要是在这个1.11之后啊,就是这默认推荐的这个planer计化器,就是blink的净化器啊,那那么在这之前1.11之前,一点十之前,我们这里边默认用的还是老版本的净化器,对吧?啊,所以这个大家大概的了解就可以了,所以我要创建这个表环境的时候,已经引入相关的依赖了,那接下来怎么办呢?啊,接下来是不是直接把这个对应的前面流式处理的这个环境引进来,放在这儿,然后去create这个就可以了,得到的就是一个stream table environment对吧,那就这样的一个东西啊,所以我可以把它叫做table env。定义在这这个过程大家想一下是不是跟Spark里边非常的像啊,呃,你像Spark里边,我们如果要是这个,呃,比方说啊,我们基于Spark还要去呃做这个,比方说去去这个呃调用这个Spark CQ的话,那是不是一开始我们需要有一个s sc Spark的contact,后边是不是还需要基于他再去创建一个Spark,呃,呃,不是那个spaq啊,就是我们想要做这个Spark streaming的操作的话,那是不是还要去创建一个Spark streaming的contact啊?
14:19
对吧?哎,我们基于的时候再去创建的时候,也是基于它去创建的嘛,啊当然了,后边的这个版本,呃,就是Spark2.0之后大家用的都是Spark session了是吧,但是原理其实是差不多的,这里也一样啊,你前面这个我们直接那个流流逝的这个执行环境,这是不是就相当于那个sa啊,Spark context对吧?呃,那现现在接下来我们基于它再去创建这个表环境啊呃,跟之前的那个过程是类似的。有了这个表环境之后,接下来我们就可以啊,基于之前的那个数据流创建一张表,那大家可以看一下这个怎么去创建啊,那就是基于这个table env,然后我可以直接来看from it stream对吧?表环境里面之前我们那个创建流的时候,你不是可以那个呃,就是直接from collection from elements吗?那所以你看这里面基于表环境我也可以直接from流,From data stream,呃,然后里边我就直接把这个data stream传进来,是不是就完事了啊,然后大家看得到这个类型,这个就叫一个table对吧?这个类型就就是一个table,所以基于这个table类型,我把它定义叫做this table啊,基于这个table类型接下来做的操作是不是就都是table API的操作啊啊,所以接下来就是。
15:40
调用table API。进行转换操作,所以接下来我们直接这个data table啊,怎么做操作呢?比方说大家看我现在就各种这个操作啊,可以grew by,然后drawing对吧,然后另外最简单的当然还可以就是直接select提取是不是就完事了啊,所以这里边我就稍微的给大家先简单一点做一个测试啊,我就直接select,然后里边可以是不是指定我要选取的字段啊,那这里面我的字段名称叫什么呢?
16:16
诶,这个我是不是已经包装成了符合flink标准的那个po类型啊,所以这里面的字段我是不是都可以直接提取。因为之前我们那个K的时候,是不是直接就可以取它的字段,这里边也可以直接取,所以这里边我取当前的ID和temperature,尽管是字符串,但是必须字段名称跟之前定义的一样,对吧,直接取出来就完事了,然后另外呃,当然了,大家看我还可以做筛选对吧,这筛选的话,我可以比方说filter,另外也可以where,呃,直接这个这这样where的话,大家想是不是就跟那个写CQ差不多了,这里边的这个API都是有的啊,那我这里边要求诶。要求啊,里边要求,比方说ID等于3341,那大家想一下,我这是不是就是把所有这个3S1对应的那个数据全提取出来了,然后大家看得到的结果是不是还是一个,还是一个table啊,对吧,最后得到的还是一个table,所以我可以把它叫做result result。
17:22
Table大家看这就是这个做基本转换的一个过程啊,啊,然后另外大家可能想到这个用table API的话,就这么简单,就是有了table之后点点点,那另外我能不能直接写CQ呢?哎,我们这个执行CQ,执行CQ这种方式其实就跟我们直接写CQ是一样的,但是在执行CQ之前呢,我必须要在当前的表环境里边有对应的那张表,就注册了那张表才可以,哎,那有同学说我这儿都基于他创建这个表了,这难道没注册吗?
18:02
因为你这个创建是不是相当于只是一个,这是Java代码里边的一个table类型的对象啊,那所以这个并不是在环境里边注册的一个过程,你要想执行CQ的话,还必须专门注册,所以这里边有一个操作,就是我做一个注册啊,是table env,然后大家看它是create temp view view,大家知道是视图的意思,其实临时视图大家想是不是就跟表示一个意思啊,对吧?啊,所以比方说我管这个就叫做sensor啊,这张表就叫SENS4,然后呢,我把data table data table传进来,这样就把它注册进来了,接下来我定义一个CQ。大家想前面我们那一串操作,其实我要实现同样功能的话,是不是本来就是select心,呃,然后from对吧?呃,这个不是心啊,大家看我是提取了ID和TEMPERATURE2个字段,所以是不是select ID temperature,然后from sensor。
19:06
现在有表明了嘛,对吧,因为大家想我这里边直接写C的话,你直接写前面那个this table能不能能能不能直接写啊。这不合适对吧,因为你这里边这个table它是个对象啊,对象的名啊,你直接在这里边写在这个CQL字符串里边是是不对的,那所以这里边我们可以先注册,注册了之后就可以用这个了。From sensor,然后后面就是where ID等于sensor sensor1对吧。哎,这就是我们定义的这个CQL,当然最后我们要把它做一个执行,执行的话还是用当前的这个环境,Tableable env调一个方法叫做CQ query,大家看到这里边传一个string类型的CQ进来,执行就完事了啊,那我们可以得到这个结果叫做result CQ table,这就做完这个操作好,但是大家可能会感兴趣,这个执行的结果我能不能看到呢?
20:09
那我们自然想到,那是不是可以啊,比方说这个像我们之前那个流是不是直接有一个print方法直接就输出了呀,来一个就输出一个,那现在这个表是不是也可以直接输出呢?啊,我们看一眼啊,Result table很尴尬,大家看到只能print STEM只能打出它的这个结构,对吧?当前它的架构能打打印出来,不能直接打印,那怎么办呢?哎,最简单的看到它里边的这种方法,就是因为我们这里边其实处理的还是一个流数据,对吧,我再把它转换成流,就这里边我可以啊突。End。Stream。就是可以可以做这样的一个操作啊,那当然这里边就是如果说我想要直接调这个这这个方法的这个to stream的话,这个是不行的啊,在skyla代码里边,你如果引入一些影视转换的话,是可以直接调这个表的to stream这个转换的,但是这里面不行,这里边必须要基于还是基于环境。
21:19
去调它的,大家看to a stream里边把上边你要转换的那个表传进来,对吧,Result table传进来啊,然后接下来里边大家想到是不是我还应该给一个当前的对应的那个类型啊。大家想想是不是这样对吧,因为你当前我并不知道你当前这个表里面数据是什么类型啊啊,那大家可能看到我现在这个表里面数据输出的时候,这个数据类型是啥呀。ID temperature这这叫啥类型啊?这其实应该是一个二元组类型,对不对,大家想想这是不是应该是个二元组啊,但是二元组比较麻烦啊,那我这里边简单书写的话,你也可以直接用,有一个类型叫做大家看flink types里边有一个类型叫做肉,这是不是就是相当于就是我们这个呃,表里边每一行的这个数据类型啊,对吧?所以你只要是表里面的数据啊,默认都可以认为它是一个就是一行对吧,都是一个肉,所以这里边就是我们的result。
22:24
啊,那同样最后这个table env to STEM,也可以把前面那个result stream做一个打印输出肉点class做一个print,这个是CQ。好,当然最后大家不要忘记是不是该执行还得执行起来啊,大家注意啊,本质最后我们还是留处理,是不是,只不过是中间我们把它形式上转换成了一个表,写了一些CQ类的操作,那本质上大家说这里边我是这个像表一样数据都到齐了才去做的操作吗?其实不是,本质上是不是它还是流啊啊,所以你这里边还是要执行起来,然后事件输入一行一行去读取就可以了。
23:07
啊,我们来执行一下,看看这个结果啊。好把它运行起来。好,大家看一下现在我们这个执行的结果,诶大家看是不是就是二元组,就是我们提取两个字段,那最后其实就是二元组对吧?诶你看它的肉是不是最后也就是按照这个字段逗号分割,然后输出了呀,一个341个温度值,三四温度值提取出来的都是三一两个输出的结果是不是完全一样啊,所以后面我们也会看到table API和这个直接写CQ的方式,两者几乎是完全等价的啊,所以这就看大家习惯了啊,你觉得这种方式用API这种方式更更舒服,那就用table API,如果你觉得写CQ更舒服的话,那就直接写CQ就可以了。这就是关于table API和link的一个简单的示例程序。
我来说两句