00:00
有了表的执行环境,那下一步当然就是创建表了,对应着我们在data API里边的第一步就是。读取数据源任务,那表本身是我们非常熟悉的一个概念了,Table,它是关数据库里边的基本存储数据的形式,那也是CQ执行的一个基本对象。在flink里边,表的概念是完全一样的,那我们就可以看成这样一个有行有列的二维数据构成的矩阵啊,就是每一行,每一条数据插入进来之后就是一行,然后这条数据里边的每一个字段就是这里面的每一个列。这个定义基本上跟我们之前所熟悉的myq postq方便的查询表,我们知道表环境里面会有一个catalog和表的对应关系,那所以在flink里边一个完整的表的ID,它其实都是由三部分构成的。
01:05
也就是说你要想唯一的确定一个表的话,那应该还包含了它的所属的目录和数据库,也就是我们应该表示出来之后就是。当前表所在的目录点当前表所在的数据库,点当前表的表名啊,就假如说我们直接创建一个叫做my table的表,那么它的ID默认情况下,目录名是default_catalog,数据库呢是default_database所以创建出来其实就是这样的一个表的ID。那具体创建表的方式有两种,一种叫做连接器表,另外一种叫做虚拟表。首先我们来看一下连接器表,最直接的创建一个表的方式,其实就是连接器表啊,那就是所谓连接器表嘛,就是通过一个连接器连接到一个外部系统,然后使用create这样的一个DDL。
02:05
创建一张表,定义出对应的表结构。比方说我们可以直接连接到文件系统,比方说我们可以直接连接到卡夫卡、redis myq ES等等各种各样的外部系统,都有对应的连接器实现,所以我们只要定义连接器就可以了。那这种方式呢,其实就是我们之前在介绍程序架构时候所使用的。执行一个Q,然后CREATE1张表。用with子句定义连接器的这样一种创建方式啊,我们接下来可以在代码当中。把这一部分再来做一个详细的定义,那这里边具体定义的过程呢,当然是传入create这样一个CQ了,或者说这是一个D,这。关键字可以省略掉,而后边的这个connect就是我们所说的连接。
03:01
如果说我们连接到某一个外部系统,那一定要把对应外部系统的连接器引入到当前的项目里边来才可以,才可以真正的运行。到这里,默认情况下,我们没有定义任何的catalog和,我们直接就定义了一个表,那完整的ID当然就是补全默认的这个cat catlo和database,如果说我们想要使用自定义的目录名和库名的话,那我们可以去做一个设置,当然前提就是说我们在当前表环境里边是有这样的一个catalog和database,我们就可以直接。调用当前表环境的use catalog和use database方法设置当前所使用的目录和数据库啊,那注册的表就都在当前的这个对应的目录结构下面了。接下来我们在代码里边可以做一个简单的实现,我们还是基于当前common API test的这一部分代码,直接在下边进行表的创建。
04:08
这是第二步。有了环境之后,我们就做。表的创建,那我们现在可以直接来连接,最简单的方式,我们现在其实已经有这个文件了吧,我们现在要处理的就是even的数据,Clicks里边就已经有这样的三条even的数据了,所以我们可以做这样的一个尝试。呃,那我们可以直接连接到文件系统,所以我们先写一个当前的。Create d dl。直接写这样的一个string类型的字符串啊,那这一部分的话,很显然我们就是要create,然后table,比方说我们当前就直接把它叫做。呃吧,因为我们知道当前其实就是。
05:00
用户点击访问的一些数据。然后接下来我们换行啊,那换行的话,接下来我们要定义的是每一个字段的名称了,比方说我们现在可以定义里边就是user啊,那这是一个string类型。然后接下来还有URL,又是一个string类型。最后还有一个,呃,Time STEM,那我们知道time本身是CQ里边的关键字,所以为了避免跟CQ里边相同,我们这里边可以给它做一个重命名,那其实这个重命名的话,我们只是在声明当前表环境里面的这个表,那这个名称跟我们外边读取进来的数据,它的类型其实是没有关系的啊,就是本身外部名,外部有可能我们定义过的一些字段名称啊。本身没有直接的关系,更何况我们现在想要的是直接读取当前的文件,文本文件啊,那当然这里边没有对应的字段名称了,我们想叫什么就是什么嘛,所以直接叫做TS,这就可以了。
06:04
当然TS我们默认是长整型,在CQ里边的话,没有这个数据类型,我们应该把它叫做,这样的话就做完了这样一个完整的定义。加上后半个括号,后边的关键呢,是要有一个with子句,这才是我们真正想要去做的事情啊,那这里面我们需要首先定义一个connector。啊,后面直接一个等号,我们现在是希望直接连接到文件系统,所以就是file system。有了这个定义之后,对于文件连接到文件而言,关键当然是需要有一个路径了,我们这里面给一个pass。当前的文件路径啊,那我们知道是在当前项目根目录下面input文件夹下的click.txt。最后我们还应该有一个。
07:00
当前文件的解析格式,因为我们当前输入这个文件,我并不知道它是以什么作为分界呀,我们当前定义了三个列字段啊,三三个对应的这个属性啊,类似于,那这里面我到底怎么样去拆解每一行的数据呢?呃,文件里边每一行数据当然就是我们表里的一行了,但是每一个字段我应该定义它的分割符,那所以这里边如果说我们当前就是这个逗号分割的话,我直接可以把当前的这个定义一个所谓的格式化工具。Format定义成。我们这里可以定义成CSV,那我们知道CSV文件的话,就是以逗号分割的文本文件,这样的话就可以完整的实现当前数据的解析了。最后把这个括号加上,就是我们定义的这个创建表的一个DDL,那真正想要创建这个表的话,当然需要去调用table env的。SQCQ这样一个方法啊,我们把这个DDL进来,这样就创建好了一张表。
08:04
当然了,我们不光可以创建准备要读取数据的这张表,我们也可以直接创建一个连接到外部系统,最后是要输出的那个表,这个也是完全可以的。所以比方说我们这里再直接去。创建一张。用于输出的表,哎,那这整个的这个过程跟前边其实是可以完全类似的,我们把这个放在后边。那么上面的这个过程可以直接copy下来。接下来我们这个是。可以把这个叫做create out d d啊,那当前我们可以把这个table叫做table。我们可以想象一下,假如说当前我们就是非常简单的从。原始的这个数据里边提取两个字段,提取当前的user和URL,然后进行输出,诶,那这个就非常简单,我们直接把这个TS去掉就可以了啊,那对应的这个定义我们就少了一个字段啊,那当然了,对于这个字段我们也可以对它进行一些。
09:14
顺序的更改啊,这个并不影响我们最终的结果,那这里呢,如果说我们没有其他更好的方式的话,可以也可以直接把它打印到,就是输出到。文件系统里面去,比方说当前这个我们就叫做。直接给一个pass叫output啊,之前我们那个叫input,这里需要注意的是,默认情况下我们是可以以。当前不同的并行度去输出不同的这个文件的,输出不同的数据的,所以当前的这个pass其实只是一个输出的目录,而并不是文件名称啊,所以等一下我们可以直接测试一下,看一看输出是什么样子啊,当然了,对于这个例子而言,我们会发现前面这个TS我们是为了避免time stamp出现这个CQ里面关键字,把它做了一个更改,那另外这里这个user其实本身也是CQ里面关键字,所以为了避免这样的一个情形啊,或者就是我们可直接给这个反引号啊,那或者的话,为了避免我们也可以直接来一个比方说叫username啊,这样的话就避免了。
10:21
出现CQ里面关键字的这种情况啊,那这样我们就把输入和输出的表都已经定义好了啊,但是这个输出的表这里我们应该把这个执行的CQ也要改一下啊,执行的DDL放到这里,这样的话两张表就创建出来了。这种创建表的方式,我们把它叫做连接器表,都是直接连接到外部系统创建出来的表,那与之对应的另外还有一类,那其实是比较简单一些,就叫做虚拟表,什么叫做虚拟表呢?呃,简单来讲就是我们已经注册好这样的一个表之后,接下来我们就可以针对这个表进行各种各样的转换操作了,那我们知道注册好的这张表,接下来其实就可以直接,因为我们是在环境里边注册好的嘛。
11:10
接下来我们就可以直接在环境里边执行CQ的时候使用这张表,而我们会发现接下来写这个CQ的时候,你就可以完整的把这一个click table和table用在CQ里面,直接去做各种各样的查询、转换计、写入这样各种各样的操作,而如果说我们并没有直接这样去创建表的话,在之前的例子里边我们发现啊,如果是从一个流转换过来的这样的一个表的话。是一个。这个table本身并没有在环当前的表环境里边注册,它只是一个Java的对象而已。那么这样一个表能不能直接在当前的CQ里边使用呢?他是不能的。所以我们会发现,在之前我们调这个query的时候,写C的时候没有直接在后面就from table,因为当前环境里边并没有一个叫做even table这个名字的表。
12:12
当前的table只是一个table的对象,Table类型的对象,哎,那所以当前我们的使用方法是直接在后面加上even table相当于是把它做了一个to string的转换啊,那table API底层默认是如果我们针对一个table做这样的转换的话,那就要把它在当前的表环境里边做一个注册,诶,那这个工作相当于是弗link底层帮我们做了啊,那有时候我们在写CQ的时候,其实是。不想每次都用这种字符串拼接的方式去连接一个table的,那我们就想到,那能不能我直接就把这个table注册在当前的表环境里边呢?啊,那或者说我们前面讲过得到的这一个已经拿到的这些表啊,如果做中间做了各种各样的转换,比方说这里做了这个select,那或者是这里边调用table API又得到了一个新的table,那得到了这个新的table,如果接下来我想写一个C,使用这个TABLE1的话,那我们发现你又只能是在后面用拼接的这种方式。
13:23
能不能直接就把当前table的一个对象接注册在表环境里边。以至于接下来我们可以直接在CQ里面使用呢?就像我在DDL里边直接写一个这个完整的字符串一样,这样这样完整的写下来呢,当然是可以的,这种方法就是我们所说的可以注册一个虚拟表。就是比如说我们调用一个。执行一个一条CQ,得到了转换之后的一个new table,那这个table如果我们想要在接下来的CQ语句里边直接写,不要做字符串拼接的话,那就调用当前环境的create temporary方法。
14:07
去注册一个临时的虚拟表啊,那我们注册的时候,前边这就是注册的表的名称,后边当然就是这个表的。实例了,对象实例了,这相当于把一个table对象和我们环境里边注册的虚拟表联系到了一起,必须经过这样的一个操作,接下来才能直接使用这样的一个new table这个表。这是。前面呃,我们在做这个简单示例的过程当中,这种方式的一个说明啊,它其实本质上是把它创建了一个虚拟表。那我们看到在这个前面使用的这个连接器表里边,它本质上其实我们说这就对比虚拟表的话,这就相当于是一个实体表了,因为它是连接到外部系统嘛,这张表我们可以认为它是真正意义上存在的,可以从外边直接读取进来,它是有数据的,而对于这个虚拟表而言。
15:05
它本质上来讲,它其实并不存在这样的一个实体,我们并不会直接保存这个表的内容,只是在用到这张表的时候,我们把它的查询语句会去嵌入到我们整个的CQ执行计划里面。所以本质上来讲,这个虚拟表它跟CQ里边的视图,就所谓的这个是非常的相似的,我们会发现调用的方法也是create temp临时视图啊,所以本质上我们可以认为虚拟表就是一张视图。那我们会发现这个虚拟表其实就可以让我们在table API和flink CQ之间进行自由的切换了啊,因为我们发现就是如果说我们使用了连接器表的话,注册出来就是创建出来这张表,它就已经在当前环境里边了,这个表是可以直接在CQ里面使用的,而如果说中间进行了各种转换,或者说我们是从流转换过来的表的话,它本身是一个cable对象。
16:10
如果我们在CQ里面想要使用,那就必须create temp table,把转换成一个虚拟表啊,那或者是用这种字符串拼接的方式把它转换成虚拟表,这是连接table API和CQ操作的一个。每一。这就是关于创建表的方法。
我来说两句