00:00
接下来我们再来给大家详细的讲一讲所谓表的概念啊,其实前面我们已经讲过了,就是table environment啊,它这个表环境里边可以注册目录这个catalog,然后呢,呃,又可以这个基于catalog去注册表,那它其实这个catalog相当于我们这里边就就是环境里边就会维护一个catallo和table之间的一个map关系,一个映射关系,对吧,就哪一个目录里边包含了哪些表,然后我们怎么样去把这张表去找到这些东西,其实呃,就是都是在这个环境里边去保持着的,所以大家可以认为这里边的这个表到底是一个什么概念呢?啊,这里边大家需要分分成两部分来来考虑,就是我们前面说的table API里边用到的那个table和以及就是我们在这个flink CQ里边用到的那个table,这两个可能还不太一样,因为我们认为。
01:00
它俩其实可以互相转换,而且使用起来好像也差不多,呃,可能有时候我们就不太区分,但是真实上来讲的话,它俩还是有一些不同的,那我们这里边主要说的这个table的概念指的是什么呢?我们这里说的不是table API里边的那个类型,对吧?那个类叫做table的那个类,它的实例并不是说这个,而说的是就是我们能在CQ里边直接用的,在当前环境里边注册出来的这个table这个表,好,那这个表大家就看到了,它其实它应该是可以归属于某一个catalog的,对吧?那真实事实上来讲的话啊,这个表其实它是应该它的这个名称应该是由什么来指定的,由一个所谓的标志符dent,那这个dent其实是由三部分的,三部分就是首先最大范围它是一个catalog名,就是它属于哪个目录,包含着它catalog,然后下面呢是。
02:00
数据库database对吧?这张表属于哪个库,这个大家就比较熟悉了,然后最后还有一个对象名,对象名当然就是表明了啊,在这个默认的条件下啊,在我们这里边定义的这个默认的场景下,其实就是什么呢?如果什么都不指定的话,那么当前一张表你直接在环境里面注册,注册进来的,它的这个标志符是什么呢?哎,就是叫default catalog.default database,然后点当前,比方说我们的input table对吧,它的完整的这个IID啊,Identifier就是这样去定义的,就是我们默认是有一个default catalog和default database,当然你也可以单独再注册一个自己不同的名字,对吧,就相当于是,呃,另外不同的数据库啊,和另外不同的这个catallo,这是关于这个表的一个定义和注册啊,那关于表里边的内容,其实就跟我们平常大家认识当中的那种关系数据库里边的那种表是一模一样。
03:00
杠的对吧,它也是以一定的架构,就是所说的这个STEM,以这个STEM架构组织数据的一种方式,它有很多行对吧,有很多个肉,然后每一行里边呢,又可以有很多个字段,有很多个这个列的这个字段,然后就组成了最后这样一个二维的数据组织形式,构成了我们这样的一个table,这样一张表啊,所以在flink table API和CQ里边定义的这个表啊,跟我们之前接触到的概念一模一样,这也就打下了啊,后边我们这个很方便的可以使用CQ啊,就像之前我们写CQ的方式一样,去做这个flink操作的一个基础啊,那接接下来我们就看这个呃表还有哪些的特点啊,那这里边还需要给大家创呃就是提出的一个概念,就是表和视图view的概念,因为之前大家也看到了,有些地方它是这个create temporary table,有些地方是temporary view。那视。
04:00
图这个view跟表有什么区别呢?呃,在flink里边,其实啊,它是把这两个用来区分什么呢?就是一种是叫常规表,就是table,另外一种叫虚拟表,就是视图,就是view。那常规表一般用来干什么呢?描述外部的数据结构,也就是说类似于大家可以认为好像是一个实体表的那种感觉,对吧?那但真真的有这样的一个组织的数据结构存在那儿的啊,比方说就是我们要连接外部系统的时候,Connect的时候,然后创建出来的时候,我们就是create temporary table对吧?啊,就是可以从文件数据库或者消息队列卡夫卡里边读出来的数据,构建这样一张表啊,这个就叫做一个常规表,那试图呢,VI呢?啊,VI一般就是说我们是从现有的表里边经过table API或者CQ查询转换操作之后得到的一个结果集,或者说一个中间的一个运行的计算的状态,对吧,这。
05:00
个我们就在内存空间里面把它存着,像一张表一样存着,这个叫做一个view啊,这其实我们在MY里面也有类似的概念嘛,其实都是差不多的,呃在弗link里边呢,其实有时候我们不需要特别严格的区分这两个概念,因为为什么呢?那大家想一下,你这里边的这个常规表table,你说是它可以用来描述外部数据,但事实上我们如果要是连接一个外部系统去读这个数的话,你想想,那最后是不是还是我要把那个数据读到当前的内存来,在内存里边,呃,用这个table的形式把它再再把它这个呃保存起来,描述出来,对吧,那它其实相当于也是虚拟的呀,对吧,就我们用的时候,其实用的还是一个虚拟的,或者说是在这个内存里边的东西啊,呃,我们其实是把那个外部的数据读进来了之后,复制了一份而已啊,但是呢,我们可以认为它是指带的外部数据的,特别是你如果那个输出的时候,对吧,你看那个音色。
06:00
呃,INTO1张这个表的时候,那其实我们如果把那张表connect连接到这个外部系统的时候,相当于我们就是直接往外系统写入数据了啊,所以就这个还是略有一点区分啊好,那接下来我们再给大家看一看这个标准的创建一张表的方法,那最标准的方法呢,就是直接在环境表执行环境里边调它的connect的方法,这个connect的方法大家就看到了,它是要连接连接一个外部系统,然后你连接外部系统的时候,这里边当然就可以传各种各样的参数了,对吧,你比方说这个,呃,连接到文件系统,那你得指定我文件的路径啊,你连接到这个卡夫卡,你得指定我当前的这个,呃,这个brokeer list呀,对吧?呃,就所以说这些东西其实都是呃,对应的一个一个配置项的一个写入啊,然后后面还需要定义什么呢?一般情况下需要定义这两个,一个是with format。就是要定义数据格式化的一个方法,这个主要是干什么的?诶,那大家知道,就是我们在这里边做这个流式处理的过程当中,你涉及到数据的数,这个来回在任务之间的传递,对吧?你这里边讲是我要把它构建成一张表,但事实上最后我还是一条数数据,一条数据,因为底层还是流嘛,还是像数据流一样啊,流逝的这个数据在任务之间去做传递,所以你一定得把格式化方法,也就是说类似于我们的这个序列化对吧,序列化反序列化的这种方式要定义出来,With format,然后另外还有一个很重要的就是你得把表结构告诉我,而且当年这个表里边有几个,就是就我们的那个STEM嘛,对吧,里边的这个字段到底有哪几个,它的数据类型是什么样的,到底有什么样的要求,那都是在这里面去定义的,我们在MYSQL里面,你创建一张表的时候,不一开始也得定义这些东西吗?其实是一样的东西啊,呃,然后接下来啊,接下来。
07:59
还有最后必须要做的一步操作就是create temporary table,对吧?连接上之后调用这个create temporary table,就可以根据我们连接上的外部系统里边的数据,然后根据我们定义好的这个格式化方法和表的结构,创建出这样的一张临时表temporary。
08:22
然后创建出来的这张表呢,它是在catalog里边注册出来的,我我们不指定catalog的话,就是在默认catalog里,对吧,直接把它这个注册进来,这就是创建表的一个过程,好,那接下来我们已经知道怎么样去创建一张表了,接下来我们就得在码里边做一个实现了,比如最简单的方式当然就是从文件系统里边读取数据,对吧?哎,那接下来我们在在这边给大家做一下这个代码里面做一个测试啊。呃,我们直接可以这个在下边第二步操作连接外部系统。
09:10
读取数据啊,当然是要注册表,对这里我们注册表,然后接下来啊,那这个其实就是说你要读哪个文件了啊,那首先我们把这个,呃,我们第一个要测试的是这个读取文件啊,2.1读取文件,呃,这里边首先我们定义一下这个文件的路径,Fair pass,我们还是读那个sensor sensor这个文件吧,直接copy它的全路径放在这,大家根据自己的具体情况来调整这里面代码具体的指定啊,然后接下来呢,诶,我们前面说了直接用table env啊,我现在就直接只用这个老老版本的这个流时处理了啊,因为基本的这些转换操作,不涉及到blink的一些特殊应用啊。啊,我这里边为了防止后面我们受影响,把这些后面的这些啊,通统给大家先注掉了啊,这部分给大家注掉了。
10:15
到这个1.4好,然后接下来直接去他的一个方法对吧?啊这里,然后我们就可以直接去new一个,你看这个connect方法里面要传什么东西呢?要传一个connectorscript对吧,一个连接器的一个描述器啊,那这个东西呢,本身就在我们flink系统里边已经帮我们提供了,这个东西叫什么?就叫做file system。System就这个对吧,大家看这里边就是link table de scriptor,大家看这个啊,它本身就是一个connector de script,然后这一个包呢,是属于当前这个flink table的scripts下边这里边就都是就是本身NK已经自带的啊,给我们定义好的这些这些描述的描述器,然后接下来我们看一下它里边要传什么参数呢?哎,那那这里边大家知道你直接这么一调,诶,可以是空参数对吧,啥都没有,那里边怎么办呢?它有一个pass方法对吧?把当前的路径要传进去,这里面它的构造构造方法只有这一个空的对吧?所以这没办法啊,那我只有空的空的这么调用了,然后接下来啊,给一个这个pass,把当前的fair pass传进去,然后接下来我们就是说的这个啊,是不是要with。
11:43
定义各种东西对吧?哎,最先我们先定义一个with format,就是当前的这个格式化的方法啊,那当前我们这个数据应该是什么数据呢。就关键是得看我们当前这个数据的格式了,对吧?哎,数据的格式我们发现它是逗号分割的,然后这样的一个文本文件,大家想这不就是CSV嘛,CSV就是逗号分割的文本文件,所以这里边我可以直接去拗一个,大家看这里边要传的是一个formatscript对吧?啊,那这个formatscript在这里边我们定义的时候又有哪些具体的,呃,就是具体的实现呢?这个实现有一个叫做old的CSV这样的一个一个东西,大家看就在这个table当前的这个,呃,Table table的script下边,对吧?在在这个里边啊,那当然这个是在bridge里边对吧,这个桥街,桥街的这个包里边,所以位置不太一样啊,那这里边为什么叫old呢?大家知道有old肯定就有new嘛,而且看到这个被画了一横线,这个被被去掉了,对吧,这是被弃用的,要即将被弃用的一个方法。
12:54
但是它调用还是比较简单的,因为我们不用呃,其他的一些依赖直接用就可以用,所以先用它给大家测一下啊,这里边直接把这个创建出来,然后接下来我们还得指定的STEM啊的话,那里面你看传的是一个S对吧,我们把这个出来。
13:14
先把它创建出来,我们调用的就是flink table script下边的这个STEM,那这里边到底得给它什么东西呢?它里边有一个非常重要的方法,大家看到叫做field field不就是指定字段吗?哎,所以接下来我们其实就是在这个里边指定当前的表结构,每一个字段是什么,比如说我当然知道现在第一个字段我叫ID对吧,你看这个field,它里边要求的这个参数就是一个string field name当前的这个名称,另外一个type对吧?啊,就是类似于我们当时定义那个呃,K state的状态的时候给的一个名称,一个类型差不多对吧?这里边也是跟我们那个定义MYSQL里边定义表一样的嘛,啊,那这里边的类型呢,大家看他刚才看到了吧,它要传的是一个data types,然后这是我们把这个引入啊,然后里边大家看可以选择它不同的类型,那当前这个ID当然是一个string类型了,我把这个string引。
14:14
然后接下来,呃,Field,下一个我可能啊要这个time Sam,对吧,把这个时间戳传进来,那data type啊,大家可能知道这是个长整型,但没有长整型,没有长整型怎么办呢?大家知道在这个呃,CQ里边,我们一般管这个数据类型叫对吧?哎,我们直接给一个in这个类型放在这儿,然后另外还有就是temperature。这是一个double类型,Get types.double对吧,这样就把这个完整的表结构定义好了,把这个对齐,然后不要忘记做完这些操作之后。要在后边大家看create temporary table对吧,把这张表创建出来,我给一个表名,这个叫input table啊,这样就创建好了啊,所以整体来讲其实还是比较简单的啊,大家需要就是知道这个流程到底是怎么样的,那至于其他的一些外部系统呢,其实也类似啊,大家可能知道,那你如果要是卡夫卡的话啊,那那就你有一个类似于这个,呃,可能有一个卡夫卡对吧?呃,你如果要是ES的话,你有一个ES啊,如果要是这个,呃,MYSQL的话,你有一个MYSQ啊,当然这个后面我们会给大家讲到这个MYSQL的连接不是这种方式的啊啊,它没有提供这种,就是看起来像一个类,我们直接调用的这种方式,而是用了另外一种,后面我们讲到再说啊,那这里边我们可以看一下这个效果,因为你这个如果不能测试的话,还是比较痛苦的啊,所以大家知道这个以怎么样做个测试呢?我们是不是可以按照之前的这写法啊,我是不是可以把。
15:57
它转成一个转成一个流啊呃,首先我先把这个table先定义出来对吧,因为当前是注册在我们那个表环境里边对吧?呃,然后现在我先把它转成一个table API里边能用的这个table类型转成这个这这这个类对吧?然后接下来要做的这个事情,其实就是EV,大家还记得怎么转吗?From把这张表拿进来,然后直接一转,这就是一个啊,大家看这个方法得到的就是一个table了,对吧?啊而且这里边你如果仔细看的话啊,它这这个里边还有一个方法叫做register table啊,就是大家看这个被弃用了对吧,甚至这里面还有什么register table sourcece register table think,就是你去注册一个table作为我的S,另外注册一个呃,Table作为我的S,这是以前的这种写法,那现在大家看就把这些都弃用了,为什么呢?呃,就是你以前写。
16:57
South和think,这是就是大家学过理由处理之后,从流由处理的观点来看,好理解一点,但是呢,别人如果要是他习惯批处理,习惯写CQ,那就觉得很奇怪,对吧?哎,什么source think,这不就都是表吗?所以干脆现在就不区分了,直接就是,而且连这个table都弃用了,对吧?直接就是一个from,直接用这把它创建出来就完事了啊,所以这就是基于已经注册好的那张表,然后我们呢,把它提取出来,转成一个table类型,哎,这得到的当前这个这是一个table,注意flink table api.table对吧,把它转换出来,然后后面大家就知道了,这个table要打印输出的话,我还得再去转换成一个流,对不对?你要转换成流的话,我们当时说还需要引入一个。
17:50
引入一个影视转换,哎,这个是这里啊CAPI,这里边就是我们引入那个呃,流的表环境的时候啊,大家记得之前这个也是引入环境的时候,把那个改成下划线,这里也是把那个环境的地方改成下划线,接下来就可以看到有影视转换的这个方法出来了,对吧?To aend stream把它作为一个转换,然后这里边给它的这个当前的字段的呃,三元组类型,Stream后面是time Sam是长整形,后面double类型,然后print打印输出,这就是我们当前的这个状态啊,这里边我们是table API test,好,我们来运行一下,看看结果吧。
18:36
好,现在代码执行完毕,我们来看看结果怎么样,哎,大家看,这就是把我们完整的这个文件里边的数据一个一个都读出来,然后做了一个。包装成我们这里边的一个表,对吧,按照这个字段一个一个,最后我们提取出来之后,诶,输出了一个三元组,就把表里边的每一行按照三元组的格式转换成一个流,最后输出的结果就是这样。
19:04
哎,这看起来就是跟我们那个本身文件里边的数据是一模一样的,从文件里边读取数据输出。
我来说两句