00:00
我们已经实现了一个table API和flink CQ的基本示例程序,那接下来呢啊,我们再给大家从理论上好好的分析一下,一个最为啊,最为直接的或者说最为经典的定义出来的table API的程序到底应该是什么样的一个结构啊,那刚才我们那一个还算是一个非典型的table API调用,为什么呢?因为一开始进来之后,大家还记得我们是先定义了流,对吧?啊,先是从外部这个文件里边读取出数据来之后,把它转换成data stream,然后基于data stream去生成了转换成了一个一张表,一个table,然后呢,再基于这这个table去做各种各样的操作,那最后我们输出的时候呢,也是把它再转换成流,就相当于我们中间这个table API就是转了一到手啊,就是本来我们可以直接基于get stream API去做操作,然后呢,我们把它这个转成table,然后做了一番操作。
01:00
可以,那大家可能会想到,难道说table API调用的时候就只能基于data STEM去做转换吗?能不能我直接一个table API有一个方法就直接从外部文件系统,或者说从外部这个卡夫卡对吧,直接就读数了呢?是可以的啊,这里边给大家介绍一下整体的程序架构,那首先第一步我们是要创建表的执行环境,这跟我们那个流处理差不多,对吧?大家还记得这个流处理的整体步骤就是首先先创建出流式的执行环境,然后接下来就是三步,Source transform think,其实就是这样,这样一步一步往后走就可以了。现在的table API和CQ的这种程序结构也是一样,只不过呢啊,因为大家知道它底层都是data stream API嘛,都是基于这个来做的,所以它第一步也是先创建表的执行环境啊,那当然了,这个表的执行环境它是要基于我们前面讲的那个流的执行环境,对吧?Stream execution environment,基于它来创建,创建有了这个表执行环境之后,接下来同样还是south transform think对吧?但是这里边定义的方式就会略有不同,而你看我们这里边定义的时候可以用什么方式直接从外部去读呢?
02:15
用一个环境table,呃,Table env直接点connect连接哦,大家知道之前我们用这个connect的时候是data streamam2个流可以做连接,对吧?哎,那是河流操作,这里边的连接指的是什么呢?哎,指的是连接外部数据库,连接外部系统这个这个连接的意思,所以这里边我们就可以connect一个标system,可以connect一个卡夫卡对吧?呃,可以connect一个这个ES,可以connect一个have了,这些都是可以去做操作的,那这里边接下来要做什么操作呢?哎,就是把它连接上之后,基于我们外部系统的那个数据连接上读进来的那个数据,创建一个create temporary table,这是更加一般化,更加就是呃常见的这种表去读取数据创建一张表的方式,而且这里大家注意啊,既然这里边是环境调。
03:15
的一个create temporary table,它就相当于是什么呢?这个input table,这是表名对吧?这张表名这就已经注册在了我们当前表环境的catallo里边啊,就是像上节课我们给大家说的就是你如果从那个data stream直接转换过来的话,它是一个table的这个数据类型,对吧?它并没有在当前的这个表环境catla里边注册它这张表的这个名字啊,所以说你要注册的话,我们还得把它就是转换成一个temporary对吧?后那是我们后面把它转换成视图了,而我们这里边直接从外部读取的时候呢,就可以直接在表环境里边把它注册出来,调的方法是create temporary table,好,那同样后面大家看啊,这是我们这个用于读取数据,这是input table还可以怎么样呢?还可以连接外部系统,然后create temporary table,这是一个用于输出的一张表。所以如果大家直观来看。
04:15
看的话输入输出一样,你看我们这里边这个输入输出几乎一模一样,对不对?哎,只是这个表名一个叫input的,一个叫output而已,那你想把这两个换过来甚至都可以对吧?那他们的这个输入输出我们说呃,整体的这个流式处理啊,Data flow的这个处理结构画出来之后应该是个DG嘛,那本来这俩是应该有区别的呀,那它们的区别在哪里呢?区别就在于后边我们做转换操作的时候,对他们的调用方式。那这里边大家看,呃,我们就可以怎么样呢,利用table API。做一些查询转换,然后就可以基于之前这张表,我们可以得到一张新的表,对吧?诶你看这里面的转换操作是怎么转换呢?之前大家给大家说过了,就是table API你要用的话,那你是不是必须先得得到table这个数据类型啊,那怎么得到数据table数据类型呢?诶这里面又有一个方法叫做table en.from这个from这里边传的这个,这就是当前表环境里边注册好的那张表,对吧?我现在是把那张注册好的表from读出来,得到一个table类型。
05:32
大家看这就是涉及到两套这个东西啊,就我们说的一套是table API要做调用的一个一个前提,前提是什么呢?得到一个table对吧,这是这个前提,然后另外还有这个fli CQ。要做这个呃,写入要做这个查询转换操作的时候,针针对表进行查询转换操作的时候,它需要怎么样呢?它需要在我们当前的环境里边,执行环境的那个catallob里边去注册一张表,对吧?注册这个table大家可以认为他俩是一样的,他俩有转换关系,那这俩怎么转换呢?哎,这里边大家就可以看到了,我们之前,呃在那个视力代码里边是怎么样,我是先从流里边得到了一个table对吧?要转换成这个注册好的table的时候呢,是创建了一个temple review去去得到这张这个table的,让CQ这边用的,而我现在是什么呢?我现在是直接从外部读取创建出来的,就是CQ里边要用到的这个表,对吧?哎,那我要调这个table API又怎么转换成这个大的这个table类型呢?哎,那就是from,因为点from好,然后有了这个table类型之后,大家看后面就是select filter,对吧,你想做什么操作。
06:51
后面我们还有window开窗操作啊,都可以直接去做就完事了,那另外还有一种方式就是直接写CQ对吧?啊直接写CQ的话,这里面因为已经注册好了嘛,你直接把它当成我们当前的这个表名,写在CQ里面就完事了,这就是这个查询转换的一个过程,那这些transform完成之后,大家看到我现在input是有了,我知道从这里边去去读,对吧,你看这个这张表嘛,这当然知道这是一个,这是我们的S,那这个呃,Output作为我们的think是在哪里体现出来的呢?
07:26
哎,最后这里边是result.insert into对吧,直接用这张表,大家注意这是这个结果表,结果表调它的一个方法叫做insert into,然后把我们得到的这个数据into到哪个注册好的那张表里面去,诶大家看到这这个效果是什么?这这就是我们这里边得到的这是一个就包括我们这里边的这个table result,这得到的是什么?得到的是一这一个table对吧?得到的是一个table,就是得到的查询结果的这个表,然后接下来你想要再通过我们的这个写入到外部环境的话,那是不是要通过我们注册在当前环境里面的这张表去做输出啊,哎,所以它掉了table的一个方法,Insert into我们当前的这张表,这样把这两张表就相当于联系起来了,对吧,我这里边得到这个result,这个结果表最终就对应着我们写入的那外部系统。
08:27
成了那个output table啊,它是这样做一个input output的定义和转换,可能这个风格稍微有一点,呃,有一点奇怪,对吧?跟我们之前感觉到的这种,呃,就是真正的刘处理好像已经已经有一点不同了,它更像更像批处理了,对吧?大家看这就是让习惯于写CQ做批处理的程序员感觉更好理解啊,你看你这个做批处理的时候不也是吗?对吧?你先告诉我这个,呃,我这个输入的表是哪张,然后输出的结果我要写到哪张表里边,然后接下来我们就写这条CQ吗?对吧?呃,写这条CQ,把这个得到的这个查询结果拿出来,最后我塞到写入到那个对应的那张输出的那张表里边,Insert嘛,对吧,Insert into不就完了吗?啊,所以这个流程看起来就更像批处理的这个思路了。
我来说两句