00:00
好,那接下来我们就开始一步一步的进行啊,Table API和这个flink CQ的调用测试,那这里边首先我们要创建一个table的执行环境table environment啊,那前面我们在做这个示例程序的时候,大家也已经看到了啊,非常简单,它需要用的是什么呢?用这个string table environment,它里边的一个静态方法就是它的构造方法create对吧?哎,直接去把它创建出来,里边它需要一个参数就是env,就是当前我们的流式的执行环境对吧?把那个stream execution environment要传进来,所以它的这种处理方式我们说类似于啊,Spark streaming,我们创建的时候基于那个SC去创建这个SPA streaming的context,对吧?啊,这是可以做一个类比这样的一种方式,那么对于表的执行环境而言,它到底有什么意义?到底有什么就是作用呢?它其实是我们现。
01:00
在做这个table API和NKC的一个核心概念了啊,大家看到后面我们去注册表对吧?诶我们去去定义这个表的一些转换,然后执行那个,呃,查询操作这些东西其实全是基于这个表环境来做的啊,所以大家看到主要的一些用途干什么呢?可以注册catalo,这catalo是什么呀?Cato大家知道字面意思是那个目录对吧?啊,就是在Q里面也有catalo的这个,呃,这个这个这个呃,MYSQ里边可能不太去关注这个catallo的概念啊,在有一些数据库里边啊,你像这个Oracle啊,啊,像其他的一些这个数据库里边应该都有这个catallo的概念啊,它是字面理解就是目录,它相当于是什么呢?是就是一组所有的这个。Database数据库和里边的表table的所有的这个原数据的一个集合,也就是说我这个目录里边一目了然,对吧,我就列出了当前我有哪些的数据库,有哪些的表,都在这里边统计着呢,你要想我们当前的这个执行环境能够知道,呃,你要操作的哪张表到底在哪里,那首先得在这个cat catalo里面去做注册啊,所以说主要的这个操作那就是注册,可以先注册catalo,然后呢,在catalog面在注册表,这都是基于这个这个表执行环境的,另外就是后面我们执行这个CQ查询的时候,对吧,做各种各样这个表的操作的时候,也都是基它,另外就还有后面我们讲到要定义这个用户自定义函数的时候,Udf的时候,也是基于环境去做定创建的啊,这是关于这个表的执行环境,那接下来我们还是在代码里边给大家来做一个测试。
02:48
接下来在table test下边再去新建一个。当前我们就叫呃table API吧,就我们这算是正式的这个一步一步开始测它的这个API了啊好,先把这个写出来面方法。
03:12
然后接下来我们在调用的这个过程当中,大家知道创建环境啊,第一步我们是创建环境,创建环境我们要创建表执行环境,前提是前提是必须要有这个流式的执行环境,对吧?Execution stream execution听错了,Environment get,把它get进来啊,那防止后边我们用到影视转换,我提前把这个先下划线,先引入对吧?然后接下来这里边我们就是该做的这个设置大家还可以做对吧?比方说这里边我做一个这个并行度直接给设成一,这个是完全没有问题的啊,或者说你不设,那就是并行度默认还是四,对吧,这个是就是我这里边还是默认的开发环境是CPU的核心数量,这个还是没有问题的,然后接下来我们要基于它去创建表环境,那最简单的方式就是刚才我们已经用过的。
04:12
Table env定一个这个东西,那需要去得到一个stream table execution,呃,Stream table environment啊,那它这里面没有get方法,而是有一个create方法,对吧?这样直接把env一传,这就没没毛病了,这是最简单的调用方式。那诶这里大家可能就会有疑惑啊,就是说嗯,你现在不是说我们现在有两种不同版本的planner吗?那你现在直接就这么笼统的上来之后就create,然后把这个传进来,就就把它创建出来了,那我们当时不是说那个,呃,就是在这个表执行环境里边,你要去做这个CQ的分析执行,最重要的一个组件就是planner嘛,对吧,这是我们的核心嘛,那它到底是调用哪个planner呢?
05:04
哦,其实我们能想到这里边默认情况下应该是要调用这个old planner对吧,老版本的这个planner,那假如说我们想指定这个新版本的planner,那应该那应该怎么样去做这件事情呢?呃,这里边大家如果感兴趣的话,你可以追到就是当前我们的这个呃,Stream table environment里边,对吧?呃,大家可以去看这个最后得到的我们去去得到这样一个,呃,就是stream table environment,在这个里边,它可以去去做各种各样的注册register function对吧?另外还可以从一个你看from data stream,就是可以从一个数据流里边做转换得到一个table,大家还记得之前这个写法吧?啊另外呢,还可以去,还可以注册data stream,你看还可以做这些事情啊,另外就是我们前面用到的,可以去创建一个临时的视图temporary对吧?啊,另外还可以我们前面。
06:05
讲过这个可以去做这个转换成流的这种操作,对吧?之前我们不是说这个把一个table要转换成流吗?啊,那后面我们还可以执行这个q update,可以执行这个insert into,这些都是基于当前的这个stream table environment来做的操作,好然后这里边接下来我们给大家看一看,就是那假如说这些操作我想换一个planner,对吧?啊,那那这个东西又该又该怎么去处理呢?啊,这里边我们就挨个一个一个都给大家来做一个,呃,做一个实现吧,呃,那这里边你点到这个create方法里边来,其实可以看到它有不同的重载方法。在这里边我们看到有不同的重载方法,对吧?那你只传一个stream excuion environment的一个参数的时候,哎,那这里边当然就是说直接直接这么去去就是返回这么一个东西就就完事了,对吧?哎,那还可以传什么呢?还可以传另外一个参数叫setting,或者是传一个table con,大家知道这就是一些配置项啊,这些配置项里边就代表你当前可以去指定哪个planner,或者说可以去指定你要用批处理还是流处理,因为我们说现在这个,特别是这个blink版本啊,它不是已经批流统一了吗?那批流统一了,我统一都基于这个,呃,底层的都是这个data stream API,那我假如说上面要要用这个table API去做批处理,那又应该怎么定义呢?就是在这个setting里边去做配置就可以了,好所以接下来我们这个给大家先一个一个测啊,这个就是1.1。
07:48
首先我们来给大家测一个这个,呃,基基于老版本planner的流处理啊,那有同学说你前面不是说这个默认就是这样的一个老版本的流处理嘛,哎,那当然了,就是我们现在把它展开对吧,你看看如果要是展开写的话,刚才我们看那个源码里边,因为没有下载源码,大家感兴趣也可以就是把源码下下来看一看它到底里边长什么样子,对吧,到底是怎么去执行的啊,那这里边我可以定义一个这这就得定义一个setting对吧?啊,当前我这个是呃,老版本的这个我就直接就叫settings吧,这个settings得传一个environment environment。
08:39
这个啊,然后大家看这没有大这一个类啊,它的这个类里边的构造方法,你看到这是一个私有化的构造方法,对吧?Private又来了,诶,那怎么办呢?诶,这里边有一个公开的方法,Public方法是new instance对吧?呃,这里边new instance当然就是返回当前的一个,哎,大家看它是一个builder对吧,返回了一个builder,那这个builder又怎么样又去用呢?当然就是有builder.build了,对吧?builder.build就能返回一个当前的environment settings啊,那这里边你当然就可以配置各种各样的东西,你看可以在什么样的模式对吧?In batch mode in stream mode,然后可以use什么样blink planner,或者use old planner,可以做各种各样的这些配置啊,所以一般情况我们就是配这两项啊,那接下来我们就现在。
09:40
你有一个instance,那接下来我们就use老版本,然后in streaming mode,对吧?呃,默认情况下其实就是这样,最后不要忘记build,哎,把它创建出来,调用build.build得到一个environment settings,这就是这样的一个调用方式啊,然后接下来我们如果创建这个table env的时候啊,啊,这个就算是,呃,我们就管这个叫做old table env对吧,而且这个是old stream啊,Stream table env啊,那调用的时候这个怎么样传呢?那就直接table stream table environment create,还是调这个方法,我们传的时候就把前面的这个流流逝的执行环境传进来,另外再把当前的这个settings传进来,而且效果跟上面这个其实是一样的,对吧?这源码里边它其实就是简这样的一个简写,传的参数就是。
10:40
这里边配置的这个old planner和stream mode,那接下来我们就继续看,那假如说我要这个基于老的版本,如果我想去做批处理,那怎么办呢?我们先先看一下老版本啊,基于老版本的批处理,哎,那这里边同样我要去定义一个,呃,就是当前的一个这个有同学说,哎,我要定义一个setting,但不是的,大家想一下,老版本里边我们的那个批流,其实它并不是统一,都基于那个流逝的执行环境的,对吧?啊,在老版本的这个planner里边,它跟这个blink不一样啊,Blink是真正的批流统一,这个API统一起来了,都基于流失执行环境,而老版本呢?哎,那批处理还是基于之前的那个批示的执行环境的,所以在这里边我还得去建立一个批示的执行环境,对吧?啊,这个叫batch env,把这个定义出来,然后这里边execution大家还记得。
11:40
对吧,我们一开始在做做这个word count的时候,就给大家讲过这两个区别,呃,这个execution environment,你直接把它引入这里边,我们获取到的就是一个P式的执行环境,然后get execution environment,对吧,把这个拿到,然后接下来我定义这个old的batch batch。
12:05
然后table env,诶,那么这里边我们要调的时候得用什么呢?就不是stream了啊,得用batch table,大家看到这个batch table environment,然后调它的create方法,它是分开去调的,对吧?然后这里边可以传一个当前的batch烟尾传进来,你看这里边调用的时候,它需要的也是这个excu environment,对吧,没有stream的,这是批处理的,这个执行环境必须传这个,这是老版本啊,老版本,所以这流和P处理就不一样,然后我们再看看新版本,哎,1.3基于哎,也就是blink啊,Blink planner的流处理,哎,那这里边我要重新我们知道这个,既然它都是批流统一了呢,都基于这个stream,呃,当前的这个流逝的执行环境去创建了,那是不是就还是这里边传一个不同的setting,就可以了。
13:06
所以这里边我们再定义一个settings啊,当前这个是啊,Blink stream settings,同样in environment setting,我们去new instance,然后里边大家就知道了,我怎么样去做呢?肯定就是use blink plan planner,对吧,然后再去in STEM mode,把这个创建出来就完事了啊,然后这里边我们这个,呃,定义一个当前的这个这个blink stream table environment,同样还是stream table environment,后边create,把这两个参数传传入对吧,流逝的执行环境,以及当前blink streaming,呃,Stream,当前的这个settings啊,那那这里面大家看到这个这里面报错,那是因为我们前面光指定了这个planner和这个stream mode,你还没有做这个build对吧?
14:06
一定要把它build出来,这得到的才是一个environment settings,这是关于这个blink planner的引入啊,我们要做这个基于blink去做处理的话,就是这样去做了,那那与之对应的另外一个大家想都能想到怎么做,对吧?啊,这个我直接copy一下吧,我把上面这个直接copy过来啊。几乎一模一样,接下来1.4,我们基于它做一个批处理的话,那后面我们这应该怎么样呢?还是用blink planner,对吧,这里边就应该是in Bach mode,在P的模模式下去做这个处理,对吧?这里大家要稍微注意一下,就这里边blink,我们这个叫back,是table environment,那后边呢,你就不能直接基于stream table environment,这是流式的,呃,这个表环境对吧?你现在最终要创建出,创建出来的应该是一个批示的表环境啊,所以说你不能用stream,呃,Table environment去创建,那用什么呢?直接用这个table environment,也有点像我们之前那个定义一样,对吧,就是前面没有stream的话,那这里边就是一个,呃,当当前的这个这个环境就是一个批示的表的环境,然后呢,也不需要大家看啊,也不需要传当前的这个流逝的执行环境,只要把当前的。
15:27
把这个改一下啊,这个叫blink batch settings,我们把这个blink Bach settings传进来,这样就没问题了,这就是这个基本的环境的定义,不同的环境的定义。
我来说两句