00:00
我们已经了解了table API和CQ的基本程序架构,那接下来呢,当然就是按部就班,一步一步来去进行详细的讲解了。首先第一步我们在真正的进行这个表的创建和转换之前啊,还是要先来创建一个环境,我们现在要创建的呢是表执行环境,这就是前面我们所说的table environment。啊,那其实我们知道对于flink而言,已经有了一个流逝的执行环境,Stream execution environment,那接下来基于它呢,又要创建一个表环境,它的意义在哪里呢?其实我们知道对于flink本身,它是一个流处理的引擎,流处理的框架,那本身数据流和表和table在结构上本身是有差别的,哎,所以如果说我们想要使用table API或者CQ。那就需要这样的一个单独的执行环境来对于流和表进行转换和处理,哎,那当前我们这个表环境主要负责干什么事呢?主要就是这么几条了啊,负责注册catalog和表,Catalog呢,就是我们所谓的。
01:10
目录,哎,那这个概念跟标准CQ中的概念是完全一样的啊,主要就是用来管理所有的原数据啊,就是所有数据库和表对应的原数据,那我们可以注册这样的catalog,另外呢,还可以注册表,注册table,在这个环境里面干注册的这些事情,注册之后就可以去进行对应的使用了。然后呢啊,在基于这个表环境还可以去执行CQ查询,前面我们提到的啊,所有的这个CQ相关的操作,我们看到都是基于table env去进行调用啊,那另外呢,还可以去注册用户自定义的函数,也就是所谓的udf啊,我们可以在CQ里边去进行灵活的使用。最后还可以处理data stream和表之间的转换,就像之前我们调用的from date stream和to date,从流里边读取数据去转换成表,以及把表再转换成一个流,都是基于当前的表环境去进行的转换。
02:10
那关于在代码当中的使用呢?前面我们也看的非常的明确了啊,最简单的方式其实就是先去创建一个stream execution environment,然后去调用stream table environment的create方法,把对应的这个比如是执行环境传进去啊,作为参数传进去得到的就是一个表环境了啊,那为了更加明显的去进行测试啊,我们还是单独的去创建一个新的object。一个SC的object,我们主要是测试通用的API,我们就叫做common API test。没方法写出来,首先我们是要测试这个创建表环境。啊,那这个其实是有两种方式啊,一种就是前面我们提到的。还是先来创建一个我们所熟悉的stream execution environment。
03:03
先把它get出来。叫做env啊,那同样还是啊,不是一般性的,先把这个全局的并行度设成一,基于它呢,调用的是stream。Table environment的create方法,然后里边只要把上面我们的这个流失执行环境传进去就可以了,得到的就是一个table en。表值性环境,哎,我们可以看到这个create的方法啊,它本身其实是有多种传参的方式的,最简单的方式就是直接传一个STEM execution environment啊,那另外呢,除了传入这个stream execution environment之外,后边还可以跟上一个配置项,一个setting或者一个table con啊,那这个我们稍后再说,到底怎么样去使用。那有了这种方法呢,其实我们就看到接下来要做的转换就都基于当前的表环境去定义表、注册表,然后去进行转换处理就可以了,但这个过程呢,简单归简单,哎,那有一个非常大的问题,就是还是离不开我们最为熟悉的这个流逝的执行环境stream execution environment啊,那我们就想了,能不能有一种方法让它跟这个流市执行环境啊彻底脱钩,我们就根本不要有之前的这个流失执行环境了,上来之后,诶,我直接就把这个表环境创建出来了,后边基于表环境就去注册表,创建表,然后进行转换了,好像看起来就跟data stream完全没有关系了,能不能这么做呢?
04:36
也是可以的啊啊,那这种方式呢,我们就不再使用stream table environment了,使用的现在使用的这个类是就叫做少了一个strip,就叫做table environment。啊,我们这里需要去引入对应的这个类啊,然后去调用它下边的一个create方法。然后我们看到这里的create呢,要传的就不再是之前的流失执行环境了,它只需要传入一个environment settings这样一个配置,或者是configuration,这里我们可以点进源码里面去啊,我们看到这是一个静态方法,最简单的传参方式就是直接传一个environment settings。
05:17
那这样一个environment settings,一个环境的配置项又应该怎么样去创建呢?诶,我们在讲到了去new它的一个对象啊,但是点进去之后我们看到啊,这个类里边它的构造方法。是一个private私有的构造方法,哎,那所以接下来我们看怎么样去创建它的对象实例呢?哎,我们看到啊,下边会有一个它的build啊,那所以我们可以直接调它的一个静态方法叫new instance去创建一个它的build啊,这个builder当然是一个内部类了啊,创建一个内部类的对象,然后在这个builder里边,接下来,哎,当然里边就可以调用一个build的方法,哎,那构建出当前environment settings的一个对象实力啊,这就是我们所说的设计模式里边的建造者模式。
06:07
那对于这样一个build而言,它还能干什么事情呢?诶,我们看到这里它其实就能做很多对应配置项的设置啊,最重要的我们看到啊,其实就是可以指定当前所使用的计化器planner到底是什么?我们看到它可以调一个方法啊,叫做use blink plan,或者use old plan,这就是指定当前到底是使用老版本的计化器还是。新版本的啊,阿里内部给我们开放出来的blink版本的计化器,那老版本这个我们看到已经要被弃用了,所以默认情况下使用的就是blink,一般情况我们也不用单独去配啊,另外还有一个use any plan。那另外还有一个非常重要的配置项呢,就是下边的我们可以选择当前的执行模式啊,有什么执行模式呢?我们看到in batch mode,另外还有一个in streaming mode啊,所以我们说啊。
07:04
Flink本身是批一体的这样一个大数据处理引擎啊,那我们知道如果说啊,之前我们这个引入的是流式的执行环境的话,诶,那默认情况下我们应该执行的就是流处理了,诶那之前在调用这个data API的时候,我们所指定的是默认是流处理,哎,如果说想要以P处理的方式去执行的话,那可以对于当前的环境去指定一个所谓的runtime mode运行时模式啊,那这个执行模式的话,如果指定为Bach的话,相当于我们就是同样的代码可以去做一个批处理。那现在我们看到啊,如果说我们用这样一种方式,直接基于table environment去调它的create方法的话,很显然我们现在跟流式执行环境就没关系了,它是一个通用的表环境,那这个通用的表环境啊,到底底层是流处理还是批处理呢?哎,当然了,我们这里边流批一体吗?那最底层还是零处理,所以我们可以看到啊,默认情况下,这里其实我们看is stream mode啊,默认是处啊,所以默认如果什么都不配的话,就是零处理模式,如果我们想要使用P处理模式的话,那就调用这里的in Bach mode方法,把is streaming mode改成false就可以了。
08:20
啊,那把对应的这些配置项都配完了之后,哎,那最后我们再把这个builder啊,得到的builder对象实例调一个它的build方法返回一个。Environment settings对象,哎,这样的话就实现了我们这里想要传入的这个参数,哎,所以这里的话,我们用另外一种方式,我们可以把。这个叫做第一步的测试环境啊,第一种情况我们就是。直接基于流执行环境。创建。那第二种方式呢,啊,就是。传入一个。环境的配置参数就是environment settings,然后进行创建。
09:06
所以这里面我们首先得创建这个environment settings啊,调用的是它的下边的静态方法啊,Environment settings调它的前面我们说哎,New instance方法,先创建出一个builder的对象实例,然后接下来哎,我们看到啊,这就可以去调用这里边的啊in什么样的mode啊,我们可以inch mode,如果不调的话,默认当然是in stream mode了。那另外呢,我们这里还可以去要求我们到底use哪一个plan,哪个计化器啊,我们知道如果什么都不配的话,默认当然就是blink planner了。那最后呢,再调一个build方法,得到这样的一个settings,那后边呢,调用create方法的时候,把这个settings传进去。同样,我们也可以得到这样的一个table environment。那当然了,就是在陌生情况下我们会发现啊,这两项其实都没必要去配啊,那所以呢啊,有时候我们也就不用这种方式了啊,直接使用上面这种流失执行环境传进去create出来也是完全一样的。
10:09
所以在后面的代码当中,我们可能就怎么简单怎么来了。使用第一种方法也是正确。这就是创建表环境的过程。
我来说两句