00:00
给了整体架构,那接下来我们就分门别类依次进行讲解,首先当然就是环境了,那一开始我们要创建一个表的执行环境,主要就是因为对于flink这样的流处理框架来讲啊。数据data stream table其实结构上还是有所区别的,因为我们想到所谓的table。本身它其实就是这样一个固定的二维的数据组织结构,有行有列,所以我们整体来看的话,一个表,一个table其实更像一批数据的组合,很明显它是更适合于批处理,所以我们会发现在Spark这样对于批处理比较擅长的。工具或者处理引擎里边,它对于CQ的支持就会非常的简单,非常的顺畅,而对于flink,它本身是流处理的话,对于table的支持就会比较麻烦,那所以对table API和flink CQ的执行,它是需要有一个特别的运行式环境的,这就是我们所说的c environment表环境。那它主要用来干什么呢?它主要负责以下一些事情。
01:14
他负责注册我们所说的目录catallock以及table表,另外我们执行CQ的时候是直接调用它的CQ query方法,那由表环境来负责CQ的执行,另外呢,它还可以去注册用户自定义的函数,也就是所谓的UDF啊,我们在flink CQ里边也是可以自己定义函数,然后去调用的。另外关于这个data stream流和表table之间的转换,我们发现from data to data stream都是基于table environment表环境去做的操作。这里的catalog就是我们所说的目录,跟标准Q里边概念其实是完全一样的,它主要是管理数据库database table。
02:00
所有的这些对象的原数据啊,那所以通过catalog我们可以非常方便的去检索到每一张表的基本的信息,我们可以进行查询的管理,可以进行快速检索啊,那在表环境里边,我们不光是可以自定义函数,那也可以自定义表,那另外呢,还可以自定义catalog啊,那所以就是说完整的这个过程,我们其实都是完整的每一张表的具体的路径啊,挂靠在哪个目录下边,我们都是可以去自定义的。而默认情况下,如果说我们不去注册,不去定义catalog的话,那默认的就叫做default catalog啊,那所以其实每一张表它都有默认的目录和数据库,默认情况下就叫default catalog default。这是关于catalog的一个说明,那接下来我们重点是介绍这个table到底怎么用,其实最简单的方式也呃,前面我们在代码当中也已经实现过了,来,我们还是在代码当中去做一个完整的说明。
03:08
首先我们新建一个。测试类。当前我们是测试。一般的这些common API,所以我们就直接叫做common API test。我们首先把这个先定义出来,接下来这个都是一样的。呃,按照我们之前的定义方式,其实在简单事例里边我们也做了,它还是依赖之前的流处理执行环境的,我们可以把它创建出来。然后接下来要做的事情其实非常简单,那就是直接。使用。Stream environment的一个静态方法,Create,然后直接把这个进来就可以了。我们就可以得到一个table。E。
04:00
这就是我们想要的东西,那对于这样的一个调用,我们会发现它还是有一点看起来有一点依赖我们之前的流市执行环境,哎,那能不能直接就没有任何流失执行环境的依赖,我们直接就把它创建出来呢?其实也是可以的,如果说我们点进去的话,其实会发现当前的create里边,它底层调用的是传入两个参数的create方法。除了前面我们需要去传入的一个stream execution environment之外,还需要传入一个environment settings,也就是环境的配置,那这里环境的配置主要可以用来配置什么东西呢?点进去之后我们就会发现。这里边最重要的其实。我们可以看到这里有一个builder。然后在builder里边最关键的啊,这很明显它又是一个调用builder.build方法去创建一个setting这样的一个套路啊,那很明显它是私有化了构造方法,通过这个调用build build就可以返回一个当前的环境配置,那在这个builder里边可以做什么样的具体配置呢?最重要的我们就看到了一个是定义当前的执行模式。
05:20
另外一个就是指确定选择当前的planner计划器。我们会看到执行模式有两种,当然是批处理和流处理了,默认情况下就是流处理模式,所以如果我们在这里根本什么东西都不传的话,直接使用的就是流处理模式。那另外还有一个计化器,当然默认就是blink bla,所以如果说我们使用了blink的化器,而且使用的是流执行模式的话,流处理模式的话,直接用这种方法就可以了。当然,我们也可以用另外一种方法。定义。我们现在是通过定义settings配置。
06:04
还。环境配置。来创建。表执行环境。所以这里面我们关键是要得到一个environment settings啊,那这里我们可以看到environment settings。我们去啊,这里面不是调它的builder.build哎,我们可以看到就是在这里边啊,本身它并没有对应的这个builder方法啊,然后我们这里边标准的方法,当然我们也可以直接去点这个build访问它的静态类啊,内部静态类那更加标准的方法,其实是调它的new instance方法,这样的话就会直接new一个build出来。要不然的话,我们还得去对应的这一个类啊,所以这里面我们是直接调用它的方法得到一个,然后接下来就可以。当然最后肯定掉的是build build,我们这里关键是要中间做一些配置,我们可以去最标准的当然是in streaming mode。
07:06
留执行执行环境啊,然后我们可以去use blink planner,这其实也就是默认的选项。这样把它定义出来之后。当前是一个settings。接下来我们要做的那就可以直接。我们会看到直接可以不用这个table environment了,直接可以调用table environment的。Create方法,然后这里面可以直接对应的settings进来。得到的当前就是一个table。好,那前面我们可以把这个就注掉了。我们会会发现这种定义方式完全不依赖stream execution environment,不依赖这个流执行环境,直接使用当前的环境表执行环境就可以得到我们想要的东西。
08:01
是一种方法,直接可以调用table environment.create另外我们会发现也可以基于stream table environment去调用create方法,就是传入en的同时,然后再传一个settings啊,我们发现这个其实没有必要,因为对于前面我们看到的stream environment,它其实也是继承自table environment,如果说我们在这个表环境里边直接就可以把对应的配置项都定义出来的话,很显然就没有必要再传入流执行环境了啊,那所以这也是一种定义方式,这两种方式我们可以任选,如果已经习惯于基于流执行环境去定义的话,那可以用上面这种方法,那如果说我们更习惯于使用这个表的配置,然后配好了之后去写的话,那用下面这种方式也是可以。当然默认情况下,因为这里面我们配置的这个stream mode和blink planner都是默认选项嘛,所以可能用上面这种方式会更加简单一点。
09:02
那如果说我们不想用默认的情况,那又怎么办呢?那下面可以有其他的几种特殊的情形。我们可以写一下1.1的这种情形,我们可以不用,首先可以不用plan,不用blink planner,我们可以基于。老版本净化器planner。进行流处理。那当然了,这个过程其实跟上面也是非常的类似。我们只需要定义一个,比方说这个叫SETTING1,然后接下来啊,那当然处理还是in mode,只不过我们接下来用的就不是blink planner,我们看到有一个是old plan。现在01:13版本的话,还是可以允许我们去引入old planner去使用的,但是我们看到已经已经划了一一横线表示deprecated啊,接下来的版本就即将要把它彻底移除掉了啊,那所以一般情况我们也不会这么去使用了。
10:01
那下面这个我们可以加一个table env1,这是基于老版本化器去进行流处理配置出来的执行环境啊,那当然了,前面我们这种情况,这可以认为这个是。金鱼。Blink版本。净化器planner。进行流处理。定义出来的表值性环境啊,那同样我们后面还可以去定义。直接copy一下,可以去定义,基于老版本planner进行批处理。因为我们发现下边的这个settings。除了in streaming mode之外,我们还可以去配置啊,配置这个batch mode,但是这里需要注意,如果说是想要去定义老版本的。
11:00
批处理的话,并不是通过当前的environment settings去做的处理,因为在老版本里边还没有真正意义上的实现批流统一,所以它其实是要基于一个批处理的执行环境去进行调用的。呃,这个我们需要注意一下,那这里边就不是这种定义方式了,它有点类似于之前,呃,我们先定义一个流失的执行环境,然后再创建当前的流失的表执行环境,那同样现在就是先定一个批示的执行环境。我们知道P市的执行环境其实就叫做execution environment,我们可以把它叫做env。哎,那么他去定义的时候,其实就是execution environment,直接去get当前的当当前的execution environment就可以,然后接下来呢,那我们直接可以调用Bach。Environment,我们看这个本身这个类其实也已经要被弃用了,因为这是老版本里面的,对吧?呃,那所以接下来我们是直接调用它的create方法,然后传入当前的Bach env,这样得到的就是一个。
12:11
一个batch table environment。这是我们当前能够看到的这样的一个过程。所以。真正如果能使用这种使用。环境设置环境environment settings去进行配置的,那是那是新版本的基于B版本的进行批处理的过程。接下来我们来。再定义一个1.3。这是基于blink版本进行。批处理。所以接下来我们可以把这个设成SETTING3,那当然了,当前就应该是in batch mode是一个P,然后接下来还是use。
13:00
Blink plan啊,那后边我们这里边定义的这个table en3就可以把它定义成同样的方式,Table environment.create传入SETTINGS3就可以得到的,同样我们看到还是一个当前的。Table environment啊,那这里我们看起来就不区分它到底是留还是P,它是统一的一个表处理环境表志性环境,那这里面我们其实是一个批处理的使用了blink版本计划器的执行环境表环境。这就是整个这个环境的定义。
我来说两句