00:00
我们已经在代码当中创建了一个表环境,那接下来呢,当然就是基于这样的一个表环境去创建各种各样的我们所熟悉的关系数据库里边的那些对象了啊,我们可以创建一个catalog,可以创建database数据库,另外呢,还可以创建更重要的就是。具体的表table,那其实我们知道啊,所谓的table,那其实就是在关系数据库里边最基本的数据组织的形式,本质上来讲呢,Table可以认为就是一个二维的数据矩阵啊。一张表里边整体来看的话,诶,我们可以认为它是由很多行数据组成的,每一行数据其实就是我们这里边的一条数据元素,一条记录啊,那然后呢,每一条记录呢,又可以看成有多个列字段构成的一个多元组,哎,那所以这就是有行有列,这就是一个二维矩阵了,呃,那我另外除了这个表之外呢,我们知道在关系数据库里边,为了方便的去查询表,去对它进行管理,那我们在环境里边去维护一个catalog啊,然后呢,去创建一个database啊,那所以我们对应的这个表,它应该有一个唯一的ID。
01:16
由三部分组成,就是当前这个表到底在哪个catalog下边,然后在哪个database下边,以及当前这个表到底叫什么名字。那默认情况下,其实我们根本不需要去创建catalog database啊啊,就默认情况下使用的叫什么呢?就叫做default catalog.default database,如果说我们创建了一个叫做my table的表的话,那它的完整的ID就是这样啊,Default和catalog default和database之间有一个下划线,这就是我们默认的目录名和数据库名,一般情况我们都不需要单独指定啊,然后接下来的关键就在于怎么样去创建表了。啊,那对于这个具体创建表的方式呢,可以分成两种形式,那这里我们介绍的就是一种叫做连接器表,另外一种叫做虚拟表啊,首先我们来看这个连接器表啊,这其实是我们在代码当中最直观的创建一个表的方式。
02:15
顾名思义,连接器表嘛,就是要单独的指定一个connector,那connector是用来干什么的呢?当然是连接到外部系统啊,所以之前我们说啊,这个标准的程序架构里边,输入输出的表就都应该创建的是一个连接器表,我们这里边要指定connect。创建连接器表的语法呢,就是直接基于表环境去调一个ECQ方法,里边传入一句创建表的DDL啊,那我们创建一个temporary table,然后跟上当前创建表的名称,然后呢,呃,后面当然还应该有对应的这个字段的一些定义啊,所有这个列名称的定义以及它的数据类型,后边还要加一个with子句,这个with子句里边就可以定义当前连接器的类型是什么,然后呢,相关连接器的配置项都应该在位里边去单独的进行定义。
03:09
那这里需要说明的是,首先这里的temporary啊,这个零食的这个关键字可以省略,我们也可以直接简单粗暴的啊,Create table就可以了。另外呢,后面我们看到啊,直接就给了一个表名,那前面我们说这个表面前面不是还应该有catalog名称和database名称吗?如果不给的话,全是默认啊,就都是default catalog和default database啊,所以一般情况我们只要指定表名就可以了。我们可以在代码当中来尝试一下,创建一张连接器表。所以接下来的第二部分,我们要考察的就是创建表的过程吧,这里我们可以创建一张连接器表,那就是去调用table烟的。Execute。CQ这个方法啊,里边传入的是一句创建表的DDL啊,那我们这里边可以。
04:02
直接写一下create。Table,哎,那当前这个创建的表名啊,比方说我们就简单一点。就还是叫做event table吧。Table,然后后边肯定要跟上当前对应的这个描述了啊啊,所以我们。做一个换行这样的一个书写。啊,那比方说啊,我们现在去创建一个连接器表,肯定要连接到外部系统啊,那我们去读什么呢?呃,比较简单的啊,我们干脆就直接去读文件吧,之前我们在测试DSAPI的时候,最简单的读取外部数据的方式啊,就是连接到一个文件啊,那所以这里边我们不是有clicks吗?读进来的都是用户在这个电商网站上的一些访问事件,我们就叫做even table啊,读进来的这张表叫做even table,那数据都从这个clicks里面去取,那对应的这一个字段当然是有三个了啊,我们要做一个解析,解析出来呢,一个string类型的user,一个string类型的URL。
05:03
另外还有一个长整形的time STEM,那当然了,如果说我们这里边考虑到啊,User和time STEM有可能是CQ里边的关键字,那这里面我们可以做一个,呃,对应这个属性字段的一个改名啊,那如果说你就想使用关键字的话,那还得加上反引号,这个会稍微的麻烦一点,比方说这里边我们指定的啊,当前这个字段就给一个,呃,比方说第一个字段我们叫做UID吧,用户ID啊那。它的类型是词缀。一个字符串类型,然后后面是一个URL,同样它也是一个字符串类型缀,那最后还有一个当前的时间戳,我们把它叫做TS吧,不要叫time,那我们知道在CQ里边没有长整型long这个类型,它对应的是。In。哎,所以我们指定当前的类型是begin,这样的话,当前这个表的结构就定义好,注意连接器表后边一定要有一个with子句。
06:02
所以在这里的话,才是我们专门去定义这个连接器的过程啊,那现在我们连接到的其实就是一个外部的文件系统啊,所以我们指定。Connector。前面这里是K,哎,那后边呢,加一个等号,后边跟着的就是对应的value,我们当前给一个什么样的value呢?File system。文件系统啊,这就是我们的连接器啊啊,那本身这个连接器当然不需要单独引入了,文件系统肯定flink给我们内置了相关的连接器啊啊,所以接下来呢,直接去定义对应的一些连接配置就可以了啊,那对于文件系统最重要的当然是一个路径了,指定一个pass。我们现在需要的安娜就应该是input目录下的。click.ctxt这个文本文件。然后另外我们还需要呢,对于这个文本文件,每一个字段要有一个解析的格式啊,因为我们知道当前是以逗号进行分割啊,然后呢,可能还要去除空格什么的啊,这样筛选出每一个字段对应到我们这个表定义里边的每一个属性里面,哎,那在这个过程我们怎么样去定义这个格式呢?诶这个比较简单啊,因为它是逗号分割嘛,我们知道这就是CSV文件的。
07:19
分割的格式。所以这里可以直接指定一个。当前的格式化工具。Format,我们当前的格式就是CSV啊,那这样的话就把这个连接器表创建好了,我们读取clicks.TXT这个文本文件,读取出来的数据呢,呃,也放在了even的table这张表里边,那所有提取出来的字段分别叫做UIDURL和TS这三个字段提取出来。这就是创建连接器表的过程。那创建了这个连接器表之后呢,它其实这个表啊,就已经存在于我们的表环境里边了,那所以接下来呢,我们说表环境里边不是可以去直接执行CQ查询吗?啊像之前我们的这个简单示例里边,快速上手的时候就直接。
08:08
基于当前的表环境去执行一个CQ query,调用它的一个CQ query方法传入我们写好的一条CQ就可以了,哎,所以在这条CQ里边呢,我们就直接可以from当前已经注册好的这个table了,那这跟我们之前的这个使用就不太一样,之前为什么我们不能直接写在这个字符串里边even table呢,而要用拼接的形式呢?就是因为本身这个even table啊,在之前那是转换过来的一个table对象。哎,这是我们代码当中的一个,相当于是一个变量啊,这个变量并没有在表环境里边去进行注册,哎,所以表环境其实是不认识他的,那怎么办呢?诶,我们这里直接做一个拼接的方式,在底层flink table API自动就会帮我们在表环境里边注册一个同名的表,哎,那这样的话就解决了这个问题,而如果说我们直接去读取连接器表的话,创建一个连接器表的话,创建出来的表直接就在表环境里面啊,所以接下来呢,直接基于它去执行CQ就一点问题都没有。
09:13
这是连接器表啊,那另外还有一种方式,其实我们看前面的这个转换,我们也就想到了啊,那假如说我们现在得到的就不是连接器表,或者说基于这个连接器表我们做了各种各样的转换,那得到的这个呢?诶,这个表就真的是一个table的对象了,它是一个我们当前这个代码当中的一个变量啊,一个对象变量,哎,那接下来。如果我想基于得到的这个结果表再进行进一步的查询转换操作的话,那怎么在CQ里面直接去调用它呢?哎,当然我们也可以用这种拼接的方式啊,底层会帮我们做自动的转换,但是我总觉得这个方式不是很爽啊,哎,那本质上它底层做的是一个什么操作呢?其实是创建了一张虚拟表,哎,这就是我们接下来要去介绍的第二种方式,创建表的第二种方式就是创建一个虚拟表。
10:08
它的调用方式我们看啊,是调用了table env这个表环境的create temper review方法啊,所以它创建的是一个虚拟视图,然后里边呢,传两个参数,一个是一个字符串,第一个参数是一个字符串,这当然就是我们在表环境里边注册的表了,创建出来的表名了,然后后边跟着的呢,哎,这其实就是之前我们得到的那个table对象,所以它本质上就是把我们在代码当中得到的table对象在表环境里边又做了一个注册。注册之后,这个名称就可以直接在CQ当中使用了,不用做进一步的转换。所以我们看到啊,整个这个转换的过程当中,在这个注册的过程当中呢,并不需要这样一个表的实体啊,其实就相当于是把我们之前得到的这个table对象在这里边做一个声明,做一个定义啊,做一个注册就可以了,哎,那本质上来讲,这跟CQ语法里边的视图啊,View的概念非常的类似,所以我们看到这里面调用的方法也是create temporary,所以有时候我们把它叫做虚拟表,有时候也把它直接叫做创建一个虚拟视图。
11:20
所以我们会发现啊,所谓的创建虚拟表跟前面我们讲的创建连接器表本质上是不一样,那创建连接器表呢,这是实实在在的连接到了外部的系统啊,那对应的我们就会把所有的数据放在一个这样的表结构里面啊,有可能是读取作为输入,有可能是做写入进行输出,而这里的虚拟表呢,其实只是在我们代码进行表的查询转换过程当中,把对应的table对象要做一个表环境里边的声明做一个注册而已啊,所以这个过程呢,其实是方便了我们在代码当中去进行table API和CQ的调用。
12:03
为我们在代码当中直接去书写CQ提供了很大的便利,而且我们可以在table API和CQ之间进行自由的切换。好,就是如果说我们是一直调用table API的话,这里得到的啊,一直所得到的其实都是一个table对象,如果说接下来我们想要使用CQ啊,那怎么办呢?诶,那就把它转换成一个虚拟表吧,相当于注册一下就可以在CQ当中使用。那经过CQL查询之后呢,诶得到的就又是一个K的对象了,自由来回切换,完全是通用的。这就是我们所说的关于表的创建的过程。
我来说两句