00:00
通过一个快速上手的简单事例,我们已经了解了table API和flink CQ大体应该怎么用,不过在这个简单事例里边呢,我们会发现这个过程当中,只要把它转换成了表,转换成了table,接下来要做的事情就非常简单了,我们直接写CQ转换,进一步的对表进行转换,得到新的表就可以了。那关键是这里边流转换成表,我们是直接调用了一个表环境的from date streamam方法,我们是先定义了硫,然后才把它转换成了表跟外部系统连接的。这个过程其实还是通过流处理里data stream,这里边我们使用这个流处理的执行环境调用source方法得到了一个data three。相当于我们还是没有绕开data做了这个转换,那更加一般化的表处理的应用程序是不是一定得先得到一个data stream才能进行转换呢?其实不是的,真正意义上的表处理转换,我们可以完全抛开data stream,直接就跟外部系统去做连接,基于表环境直接去创建一张一张类似于数据源一样的输入数据的表就可以。那接下来我们就要。
01:18
详细的讲解一下表处理的流程,它的基本API到底是怎么调用?首先我们来介绍一下一个flink的表处理程序,它的架构整体应该是什么样子,其实跟之前我们介绍过的data API非常的类似,因为它的底层相当于还是流处理吧。所以我们还是应该分为。也就是数据源读取数据源,然后transform做中间的数据转换,最后的SK把数据写入到we部系统这样的三大部分,当然前面还应该有一个创建环境的过程啊,那当前如果说我们现在是table API或者是CQ的话,我们对应的这个过程其实也是类似的,只不过数据源读取之后,我们应该就是得到一张表,得到一个table,然后后边的转换呢,也就是基于表去进行各种各样的转换,最后输出数据的时候也应该是直接把表输出到外部系统就可以啊。最好的方法当然就是不要再把表转换成流,然后再打印输出,这样的话相当于又过了一到手啊。我们希望的是通过table API能够直接取代之前的data,能够完整的实现端到端的这个处理流程,所以接下来我们看一下最标准的程序结构应该什么样。
02:42
那首先当然了,我们还是需要有一个基本的执行环境,我们现在要创建的就是一个表的执行环境,关键是要有它,然后接下来我们需要创建一张输入的表。而这里面我们看到创建输入表的时候用的方法就不再是前面简单事例里边从流去做的转换了,而是直接调用了c env表环境的。
03:10
Executeq这样一个方法,也就是直接执行了一句。那跟之前我们执行CQ的过程不一样,之前我们是执行了一条查询,而现在呢,执行的就是创建表的CQ,本质上来这个该做个D,那我们看一下这个D,那就是create temporary,当这个以可以省略,我们可以直接create,然后后的input,当然这就是一个输入数据表的名称啊,那这跟我们平常在里或者其他的关系数据里使用DDL去创建一张表的过程是完全一样的,那后边当然应该还有对应的当前表的。每一个列的定义啊,就相当于表的架构啊,STEM要去在后面生成出来,跟我们平常定义表是一模一样的,那区别在哪里呢?关键就在于我们不光是要把表定义出来。
04:08
因为我是要连接外部系统去读取数据,读到这张输入表里,那我还得告诉到底是连接到哪个外部系统啊,所以接下来我们还有一个特殊的子句with子句。这个位子后边接下来的这一部分,当然就是对应的到外部系统的连接相关的配置,那最关键的有一个字段是connector connector连接器嘛,所以这个connector这里边用的是单引号,因为我们知道这是在一个DDL,一个string里面嘛,所以这里使用的是单引号。后边它的本身的值的定义,也就是非常简单的直接等于什么就可以了啊,那自然可以想到,如果是连接到卡夫卡的话,哎,那就等于卡夫卡,如果是连接到呃,MYSQL的话,那就应该GDBC啊,那当然除了这个基本的connector定义,后边肯定还应该有一些基本的配置,对应的那些配置项是要有的啊,比如说呃,像这个连接到MYSQL肯定那就有GBC对应的那个,Uri对应的那个呃,连接啊,那是需要有的,然后可能有这个username password啊,对应的一些信息肯定也是也都是需要有的,那如果连接到卡夫卡的话,应该有对应的topic,对应的booswap server啊,这样一一些相关的信息我们都应该放在里面啊,那这个是自然能够想到的一个配置的方法。
05:32
这是标准的使用table API和flink CQ进行数据读取和写入的方法,就是直接用环境调用表执行环境的SQ的CQ方法写一个DDL。所以我们会发现这就跟我们直接在MYSQL或者post CQ里面去进行操作完全一样,只不过就是把连接的这个连接器用一个with子句去表示。这是第一步,创建输入表,对应着我们DPI调用时候的source source。
06:05
然后接下来呢,又有所不同,按照我们之前的规律,那应该a source之后就应该得到了一张表,接下来我们就应该定义这张表的转换了,诶,但是在标准的table API里面呢,它会更加灵活一点,我们可以现在先不做转换,直接继续去定义表,我们看接下来做的是注册一个表,同样还是连接到外部系统。用于输出,也就是说相当于把我们之前的最后一步那个think直接就添加在这里了,为什么他可以这么做呢?因为我们可以想到,现在如果我们把它都转换成了表,那就跟之前的data stream流不太一样了,因为如果是的话。数据流,哎,那么一定是严格意义上按照我们先后执行的任务顺序,要把每一个算子依次排列下来的,那肯定一开始是S。
07:01
最后是一个SK,哎,这个是绝对不会改变的,一定是按照它的执行顺序,而现在如果我们是表的话,那很明显那就是。S,一开始我们就是连接外部系统,往里边去塞数,然后呢,诶,最后的think,其实我是要定义一张空表,然后中间经过各种各样的转换,把这个表写进去,它会连接到外部系统,然后直接就写入外部系统就完了嘛。所以我们会发现写入数据的这个过程其实是我们中间转换的过程,而对于表的定义,表的声明,它里边各种字段的结构,以及跟外部系统的连接,这些很明显是一开始就可以定义出来的。所以我们会看到现在就可以直接把输出表也直接注册出来。而且这里方便的一点在于,输出表跟输入表定义方式完全相同,样还是基于表环境去调用XQ的CQ方法,里边写一个D去create table啊,这里边我们create创建了一个叫做table的一张表用于输出,后面同样用一个with子句指定当前的连接器连接到什么样的外部系统,写到哪里去,这就是我们的S环节,直接就在这里放。
08:20
放在这里啊,那可能我们会发现,诶,要这么说的话,这输入和输出是不是就靠这个名称来做区别呢,上面叫input,下面叫output呢。对于我们的代码逻辑来讲。我们。通过这个名称来进行区别,对于flink底层,对于flink的table API来讲,这两张表其实没什么区别,我们看到它只是定义好的两张表,连接到外部系统的两张表而已。啊,那至于说它到底是用来读取数据还是用来写入数据,这是通过什么定义的呢?其实关键是要看后边的查询转换操作,所以接下来我们真正的从表到表的这个转换操作,才真正意义上代表了哪张表是输入,哪张表是输出,哎,那什么样的表是输入呢?很明显就是我们执行一条CQ,然后如果是s select什么什么from from这样一个input的table的话,很显然这就是输入表啊,你要从这里面读数,所以很明显就是连接到外部系统去读取数据。
09:26
这是。那什么样的表示输出呢?啊,那最后那就是我们应该能够。把得到的啊,前面我们这里是可以直接写CQ,也可以去使用table API去做各种各样的转换啊,那当然了,就是使用table API的时候,我们也可以用环境去调一个方法,当然我们也可以直接基于前面啊,就是假如说啊,我们已经创建出了一张表的话,对应的table的话,也可以直接去调对应的这个select的方法,那调哪个表,当然对应的这个表就是输入了。
10:00
做完了转换之后,最后哎,那我们会看到可以有一个输出的执行过程,这是要调当前表的。一个方法叫做execute insert,哎,所以就是把我们得到的那个转换之后的结果写入到哪张表里,那很明显写入到output table里,这就是对应的输出表,输出到外部的连接到的那个外部系统了,所以这就是think。所以它的到底是圆还是最后的写入外部系统的think,都是通过我们这里的后边的转换执行逻辑来定义出来的。真正意义上的一个完整的table API程序应该是这样的,一个就是从一开始,然后就是各种之后呢,就相当于是针对这些表去写各种q select啊,那我们知道最后执行的这个X的insert,这是相当于了一个C的方法,呃,本质上来讲,这也就像我们直接去调用一个音色的那样的一个DDL是类似的啊,那所以。
11:08
这就是整个完整的从输入到转换到最后输出的完整过程。我们可以看到,通过这样的一个结构的定义,就已经把之前我们所说的这个简单事例里边的先得到硫,再把流转换成表,最后再把表转换成硫,再做print打印输出。就把这个过程。直接就去除掉了啊,就相当于没有中间商赚差价了,直接进行表的转换,直接跟外部系统进行连接操作。我们也可以知道,对于这里边定义的input table和output table的话,我们也可以直接给一个别的名称,只要我们清楚它是用作输入还是输出就可以,那输入的表显然我们这里要from那输出的表的话,就是最后insert要把对应的数据要写入进去,这就是我们处理table API它的整体的。
12:04
程序结构啊,那在早期的flink版本里边啊,它的table API其实是有专门两个类的定义,就是table sce和特和table s,这两个定义的话就非常的明确,我们一看就知道,诶,哪一个是用于输入,哪个是用于输出的,那table sce的话转换过去最后对应的就是我们的S算子,Table的话转换过去就对应的是我们底层it stream里面的S算子,那后来的话。就会发现这种定义跟我们关系型表的定义啊,和这个CQ的使用这种习惯是不太一样的,所以就把它包起来了,直接就包在底层了啊,那上层的话,我们看到的就都是完全相同的表的定义方式。
我来说两句