00:00
前面我们用一个简单的案例做了快速上手啊,那这个过程我们实现的需求其实非常简单啊,其实就是按照用户名去筛选一些对应的数据啊,从当前数据里边做一个筛选,如果说要直接调用data stream API的话,那就直接做一个filter就完了嘛,其实也是非常简单的,那在这里呢,我们是使用了table API啊和link CQ,它的关键点其实就是在于把一个数据流data stream转换成一个table,得到一个table对象之后,哎,那可以直接基于它去做各种各样的方法调用,这就是使用table API去进行转换计算,那或者呢,我们也可以直接写CQ from这张表去做查询转换啊,那这两种方式都是可以得到相同的结果的。其实有了之前data没PI的基础啊,那这一部分我们会发现代码其实非常的简单啊,那接下来呢,我们就进一步去进行详细的展开来看一看啊,在table API和弗Li CQ里边各种各样的API到底是怎么样去使用的。
01:06
首先呢,我们先来对于table API和CQ啊写出来的link程序,它的整体架构做一个简单的说明,那前面我们看到这个代码已经很明显了啊,这整体架构是什么呢?呃,其实跟之前的d vpi的这个调用结构类似,首先也是先去读取数据源啊,这当然先要创建这个流式执行环境了啊,创建出环境来之后读取数据源,然后接下来呢,进行转换处理计算,最后再做一个输出,我们这里还是打印输出,只不过呢,诶在中间进行操作的时候,我们基于data stream又做了一种包装转换。添加了一步把data stream转换成table的过程,然后呢基于table进行各种转换,转换完了之后再把table转换成了data,然后做了打印输出。诶,那我们会想到啊,那要这么说的话,相当于这个table API的调用,是不是就是在之前data STEM API的基础上又多了一层呢,多套了一层呢?诶这样看起来其实是稍微有点麻烦的啊,净多了一层啊,那换来的好处也只是我们这里可以直接用CQ而已。
02:16
哎,那我们就想到了,那对于这个data STEM这一步啊,能不能不做这样的转换呢?那就是我们直接用table API去读取数据源,直接把它这个源头啊,读进来之后,获取到的直接就是一张表,能不能绕过data stream这一步,哎,其实是可以的。真正意义上的一个table API啊,对应的程序其实根本不需要经过data stream做中间转换。我们看一下啊,一个标准化的table API对应的程序结构到底是怎么样?呃,同样啊,跟之前这API的流程类似,同样也是可以分成主要三大部分,那就是source,然后transformation中间的转换,最后的think输出,主要就是这三部分,哎,那我们这里边要做的操作呢,首先那就是先要创建一个环境,我们现在既然是调table API或者CQ,那要创建的就是表执行环境啊,啊先把这个table env创建出来,然后接下来呢,诶,注意我们这里就是要创建一个输入表。
03:20
这个输入表,哎,那我们看到现在就不是基于这个流式的执行环境去读取外部数据源了啊,而是直接基于表执行环境去执行一句cqe q CQ,它调用的是这样一个方法,里边当然对应的就是一句创建表的啊,DDL了啊,这样的一句CQ啊,所以我们看到里边执行的其实是create temporary table啊,当然这里加了一个temporary啊,我们知道就是零时表create temporary table,然后比方说我们这里是表的名字啊,就叫做input table,诶,那这么看的话,这就是跟我们在CPU里面去创建一张表这个过程是一样的,那它怎么样能够连接到外部系统去读取数据作为输入表呢?
04:04
那这个数据从哪来呢?注意后边加了一个位子句,这个位就是连接外部系统的关键,在这个位子句里边,我们就可以指定当前的连接器connector到底是什么?那比如说连接到卡夫卡的话,那connector就是卡夫卡啊,那如果连接到ES的话,那connector就是elastic search。有了这样一个输入表的定义,那相当于接下来啊,我们在这张表里边input table里边拿到的就是从外部这个存储系统里边捕获到的输入的数据了啊,那然后接下来呢,同样的操作,我们可以再去注册一个输出表。它也是一个所谓的连接器表,那就是它也要连接到一个外部系统,同样是create temp table某一个表,我们这个表用于输出就叫做output table,同样后边有一个位置子句指定当前连接到哪里,其实我们看到啊,这个语句输入输出其实是完全一样的啊,那所以到底谁是输入谁的输出呢?啊,其实我能想到啊,你都是跟外部系统的一个连接嘛,然后连接好了之后,接下来要做的操作呢,那就看你后边进行查询转换了,你到底是from哪张表,那很显然from的这张表就应该是输入。
05:23
那最后呢,你如果要把得到的结果要写入到哪张表里边去,那对应的那个表就应该是输出,所以我们看啊,对于这个输入输出表的定义和注册它的语句是完全一样的,并不区分,真正区分它们的作用的地方是在后边的转换和处理里边。所以有了这个基本的表之后,后边哎,那就可以直接做这样的一个CQ查询转换了,比方说我们这里就select,哎,什么什么样的字段from input table,那当然得到的结果呢,就是一个新的table对象了,这是一个TABLE1,当然了,我们这是直接执行CQ,另外呢,也可以去调用table API,如果调用table API,那就是啊,直接调用table inv.from某一个table,比方说这里就from input table,那这里得到的呢就是一个。
06:14
Table的对象了啊,这里我们得到的一个table对象,然后再去调select啊,或者where什么什么样的方法,同样也可以得到一个转换之后新的table table2。那得到这个转换处理计算table之后,怎么样再把它输出到外部系统呢?诶,那就要执行一个。Execute insert这样的方法直接针对这个table执行一个插入操作。然后指定插入到哪张表里边啊,这就相当于是把我们这个table对象啊,它里边的数据到底要插入到哪个连接到外部系统的那张表里去,这样一做操作的话,那很显然这就是一个输出表。这里我们看到它会得到一个table result,这只是一个返回的一个think的结果,这就并不是一个真正的表达,真正的这个数据已经写入到外部系统当中了。
07:10
所以我们看啊,这个完整的过程呢,相当于就已经抛开了data stream跟data streamam就没关系了啊,直接有了表环境之后啊,接下来我们要做的就是针对这个表去做创建啊,创建出表来之后,然后就做各种各样的中间转换操作,我们完全可以以纯粹写CQ的方式把这个处理的逻辑全部定义出来,哎,这就是我们在使用这个高层级API啊。Table apiliq里边非常常见的一个通用的方式,那在实际写代码的过程当中,最常见的其实是这种结构。啊,那另外呢,我们还可以多说一句,就是在早期的flink版本当中啊,还有专门用于输入输出的这样的一个对应的接口啊,啊叫做table sce和table think,那我们就看到了这个顾名思义嘛,很明显table sce就对应之前我们的那个sce段子sce任务,那table think的话当然就对应的think任务了,哎,那跟我们流处理里面的概念是一一对应的,不过呢,这种方式定义看起来就更像传统的流处理,更像处理data stream啊,那我们如果真正要是把它当成一个这个关系型的表来做处理的话,那很显然根本就没有必要去定义什么source和think嘛,我们直接写CQ from哪张表,然后最后把它插入到哪张表不就完了吗?诶,所以新版本里边就已经不再去做table s和table s这样的一个区分了。
08:38
输入和输出的表都可以用统一的方式把它们定义出来,然后在后边的转换处理逻辑里边再去指定到底哪张表,这就是输入,然后写入哪张表,这就是输出,啊,这就是我们的整体程序架构。
我来说两句