00:00
那呃,接下来我们要看一看table API具体到底是怎么样的一个处理流程,因为之前我们的那个事例呢,大家看是非常简单的,还是先基于一个data stream,我们先把这个数据是读到了这个流里边,然后基于流直接把它转换成表了,那如果说大家会想到实际这个使用table API的时候,可能会想到我我可能不想依赖于data stream API了,对吧?嗯,你想我们这个既然说它是上层API嘛,你如果还是先去转换成data stream的话,这不还是要要依赖于这个下面的data stream嘛啊,所以我们会想到能不能直接比方说像这个读取数据之前,我们是先从这个env啊去read text file,然后把它读成了这个data stream。我现在能不能直接读read text,读出来之后直接就是一个表呢?有没有这种写法呢?那其实还真真的是有的啊,Table API底层是给我们提供了这种就是从头到尾直接用table API把它搞定这种方式的,那大家看一下最标准的一个table API或者CQ的这种写法啊,调用的过程当中程序结构是什么样的?
01:11
首先第一步就还是要创建一个表的执行环境,对吧?啊,这个还是把这个table env先创建出来,然后接下来呢,大家看,就是我先要创建一张表,我们不是读取数据的时候有那个SS任务吗?那大家想我是不是也对应的应该有一个输入表去读取当前的数据啊,所以大家看我现在就创建一张表,创建的方式是调这个env table env的connect方法。这个connect大家想是连接的意思对吧?之前我们是两条流做连接,得到一个connected streams,那现在这个环境要连接,你说这连接的是流跟流吗?那当然不是了,对吧,现在它连接的其实就是一个。外部系统对吧?啊,就是比方说我直接连接到一个文件系统里面去,那是不是就相当于从文件里面读取数据啊,然后create temporary table,创建这样的一张表,对吧,Into the table。
02:09
另外呢,我还可以直接注册一张表,或者创建一张表,这张表就是输出表。大家看在这个,因为我们当前是基于表做操作嘛,那是不是就可以之前我们是按照流程是这个source,然后transform,然后再think对吧,一步一步定义,那现在你如果是表操作的话,是不是我就可以直接把这个表现定义出来啊。我没数据也可以直接把这个表先定义出来,对吧?啊,所以你看我就是先把这个input table定义好,Output table定义好,那就是从这个外部系统连接这个外部系统读取数据写到这张表里,然后最后我做转换之后的那个数据呢,诶写入到这张表里边,然后是不是这张表要连接到外部数据,呃,外部的比方说卡不卡啊ES对吧?MYSQL连接起来的话,是不是就相当于直接写出去了,诶所以大家看这个完整的流程就靠这种。
03:04
定义表连接,连接外部系统定义表就可以把它搞定。那中间的转换操作,当然就是基于我们已经注册好的这些表,对吧?啊,那比方说你要做这个,这里边大家看啊,它是直接在环境里边注册出来的表,对不对。哎,所以这就又涉及到一个问题,在环境里边注册好的表,CQ里面可以直接用了,但是在table API里面不能直接用,因为大家想table API里面是不是我需要得到一个table这样的一个数据类型啊,对吧,Table的一个对象,然后才能掉嘛,所以大家看它还有一个基本的操作是要因V再去from。Table,再去from from,直接就写我们在环境里面注册的这张表,把这个table类型的这个数据结构拿到就可以了。拿到对应的这个对象对吧,然后接下来再去select对应的做转换操作,当然我也可以直接写CCQCQ的话,大家看直接select什么from table对吧,啊直接做这个,呃,提取转换筛选啊,得到这个结果还是一个。
04:08
然后最后怎么样,大家看到我就可以直接把对应的这个result啊,对应的这个table直接写入到输出表里面去,对吧?啊,当然这种写法更像是这个,呃,就是SC里边那个写法,如果是我们现在是当前是这个Java的,呃,环境的话,语言语言的话,我们直接应该用的是。呃,就是,呃,就是当前啊,如果说我们是想转换成流的话,是需要调用这个cable env to a stream这样的一个方法去做一个转换,对吧?那如果说我们是直接想把当前这张表写入到外部系统的话,那大家看是不是可以直接用这张表调它的insert into这个方法呀。然后里边传的就是当前是不是我定义好的输出表的那个对应的那一个名称啊,啊,所以接下来你看直接insert into这张表,这是不是就是把我当前得到的结果表输出了。
05:07
这张这里边的这个output table是不是连接到外部系统,就相当于是一个thinkk啊啊,所以大家看直观来看的话,还是之前我们讲到的流式数据处理的三个步骤,就是前面首先是有那个环境,对吧?有了执行环境之后,首先是source读取数据源,然后是transform,我们做转换,最后是think输出结果,对吧?只不过这里边table API和CQ的这个流程把它转变成了。哎,就是首先这个SS是创建一个表,连接外部系统,创建一个表对吧,读取数据,然后呢,诶后边的这个执行的这个过程,这就是基于table API去做转换,或者是直接写C执行就完事了,得到的都是table,那最后think这一步还是创建一张表,注册一张表,然后调这个某一个执行结果,那张表的insert into,把它写入就可以了。
06:02
所以其实大家会发现,我们之前的这个注册表,它其实是不是根本就可以不区分输入还是输出啊。大家是不是发现了这里边根本就这里边,我叫名是教程input table和output table,其实大家知道我这里边只是连接外部系统而已,并不知道当前到底是要输入还是输出对吧?那输入和输出其实这个流程是在哪里能够看出来,能够定义出来的呢?是不是就是在后边这里啊,你看我如果这里边from的话,那是不是这里边就是这应该是一个S啊,相当于一个table sce对吧?然后如果说我最后要把一张表要ins such into这张表的话,这是不是就是一个table think啊啊,最后的一个think啊,所以整体的流程都是通过后边来定义出来的,三步source transfer think。这就是flink table API和CQ里面的基本数据程序结构。
我来说两句