00:00
好,接下来我们就开始,既然已经有了表的执行环境,接下来我们就可以在里边去注册表了啊,接下来就是真正的这个表的操作了,那所谓的这个表在CQ里边啊,在link CQ里边到底又是什么东西呢?其实就跟我们之前在呃,MYSQL对吧,在QL的这个呃,本身原生的语法里边这个概念是差不多的,它就是一种一种数据组成的一种格式对吧,一种数数据的组织的形式啊,里边我们可能一张表里边有各个不同的字段,然后每一条数据就相当于是这些一组字段的一个集合啊,那么我们是不同的数据来了之后,在表里边就存成了一行一行的数据,整个就形成了这样一个二维的数据保存的结构,对吧?哎,这就是我们所谓的这个表。那表跟前面我们讲的这个环境有什么关系呢?诶这个就非常重要了,前面我们讲到的这个环境啊,所谓的这个table environment。
01:00
主要的功能大家就看到了,它是table API和CQ里边最核心的功能,它可以去注册catalog,然后呢,在这个目录catalog里边去注册表,对吧?啊,那后边我们基于这个表去执行这个CQL查询做转换,另外注册这个udf函数,其实都是基于这个环境来做的。那其实大家想到这个所谓的呃,环境啊里边,它其实就是要在我们内存里边,要把想要保存的数据的数据格式结构都要定义出来,所以它是有这样的一个层级关系的,那一张表,它本身是什么样的一个层级关系呢?啊,就表的那个完整的一个定义啊,它其实是一个标志符,这个标志符是要有。三个组成部分的。呃,就首先是应该要有一个catalog名,有一个目录名,然后呢,基于这个catalog里边又有定义出来的database,有这样的一个数据库名,然后是数据库下边才是具体的这个对象名,也就是表名,对吧?啊,所以大家发现这不就是我们要找具体一个存储数据的一个门牌号数嘛,对吧?啊,就是哪栋楼几单元几号,这不就是这样一个过程,跟我们在CQ里边啊,CQ原生语法里面的定义基本上也是差不多的。
02:19
呃,那么在默认的场景下,大家看之前我们在这个代码里边没有设置任何的什么catalog或者说数据库,就是注册了一个表对吧,就注册了一个表名,那那个那个表名对应的那个catalog和数据库又是什么呢?啊,如果我们不指定的话,默认它就叫做default catalog和default database就叫这个名对吧?所以我们一开始的话,如果你不划分具体的这个不够复杂的话啊,不划分这个具体的目录和数据库,那我们就直接用default就完事了啊,那这里边提出一个概念,就是表可以是常规的,也可以是虚拟的。
03:00
那什么叫做虚拟表呢?呃,虚拟表其实就是大家熟悉的这个视图的概念,View对吧?呃,这个就叫做虚拟表,那在flink里边,它俩主要是一个什么样的区别呢?其实大家会发现啊,在flink里边差不多,因为之前我们在CQ里面,像MYSQL里边,如果大家去区分这个view和table的话,这个概念很明显,因为table其实就是一个实际存在的,在在我们这个,呃,本身这个存储空间里边已经有的这样一张实体的表,对吧?而view的话是在做中间转换过程当中,是在内存里面虚拟生成的一张表,对不对?呃,临时的这样的一张表。而在而在里面大家想,那那这个我是不是所有的东西都应该是内存里边的一个这样的一个处理的一个数据结构啊,所以其实本质来讲啊,它是没有太大的区别的啊,那那那这里边我们为什么还要提出这样的一个概念呢?那就是还要跟我们之前的那个MYSQL里边联系起来的话,那就是有些数据它是不是真实的。
04:07
它是真实存在的那些实体数据啊,哎,所以这就会想到,假如我们是连接到,比方说连接到外部文件,文件系统从文件读取数据,那这个表是不是对应着有一个实体啊,文件里面存着的嘛,那或者说我连接到这个卡夫卡,那大家想卡夫卡里边是不是对应的这个数据是有实体的呀,所以连接到外部系统的这些这些表,我们是把它叫成一个常规表,或者大家可以认为就是一个实体表,对吧。那另外如果要是说这个view的话,我们一般说的就是呃,就是相当于我在当前的这个呃,这个flink内部环境里边啊,做转换的过程当中,临时创建出来的那个中间状态的那些表,对吧?所以一般它都是一个table API或者CQL查询的一个结果集,我们把它保存成一个view对吧,注册成一个view。
05:05
那这里面还涉及到一个概念,就是大家要区分开,我们这儿提到的这个表。大家看没有做区分,就是到底在table API里边使用还是在CQ里边使用,那大家想在table API里边和在CQ里面使用是一样的吗?不一样,因为table table API里边,它本身就是内嵌在Java或者SKY语语语法里的,嗯,那所以本身table是个是个接口对吧,然后我们是不是相当于创建出来是它的对应的这个一个一个类的对象啊,一个table这一个一个表示一个对象,那么我们在环境里边注册的时候,是不是还需要再单独把这个对象注册成一个具体这个这个目录里边啊,环境里边的一个一张表,对吧,才能够执行CQ。或者如果说我们已经注册在环境里了,想要调table API的话,是不是还得把它做一个转换啊,做一个读取对吧?啊,从这个环境里边做一个读取转换,所以这两两套用法还是稍微会有一点区别的,这个大家可能要稍微注意一下啊,那么接下来我们自然就是要在代码里面要实现一下怎么样去创建一张表了啊,那大家会发现这个创建表其实也非常简单。
06:22
就是前面我们说的是不是要连接到外部系统去创建表啊,啊,大家看这里边我们创建的create temporary,这个就叫table对吧,然后你如果要中间转换的话,可能那个就叫create temporary啊,所以这里边我们连接外部系统调的都是创建这个table啊,常规表的一种方法,那这里边的关键其实在于基于table env环境要去调一个connect方法。在这个connect方法里边就可以传一个外部系统的一个连接描述器,对吧?把这个外部系统描述出来,连接上了之后,接下来大家看还要调其他的一些方法,比方说要调什么呢?With format,大家知道这个with format是干什么吗?
07:06
With format是不是就是对大家想连接外部系统的话,这就没准你像我们从文件里面读取数据的话,是不是有可能它都是一串这一行的都是数,呃这个呃,String这个字符串,对吧?那你读进来之后,我们接下来转换成表的话,你不能说这个表里面就一个字段,全是一个字符串,那是不是我要做一个数据的提取啊,诶之前我们那个流的提取是直接一个map,直接把它分割,然后然后转换成这个po骤类是不是就完事了,那现在提取怎么提取呢?诶,是不是就可以我直接告诉当前的flink,你当前的这个格式是什么样的呀,对吧?所以我可以定义一个with format对吧?啊就把它这个就是格式化的这个,呃,当然这个是格式化方法啊,就是这个主要是定义什么呢?我按照什么什么格式去解析这个当前的这个字段,对不对?呃,解析当前的这个每一行数据啊呃,比方说我是按照这个CSV文件的这个格式去解析呢,对吧,还是按照这个Jason格式去解析呢?诶这个其实是有不同的考量的,然后后边还有还有就是威给嘛,对吧。
08:16
哎,大家想就是我当前这个表里边,你总得给我定义出来,这个表长什么几长长什么样子吧,在CQL里边,MYSQL里边,大家想我创建一张表的时候,写DDL是不是要create table,然后里边是不是要指定每一个字段名称是什么,类型是什么都得定义好啊,那所以这里边我们是不是也得有这样的定义,那所以这里面的定义就是withki定义表结构。啊啊,然后最后创建这样的一张表,就是这样的一个过程。其实大家想在那个之前,我们data stream里边是不是也有类似的过程啊,对吧,所以就是在我们这个代码里边,Data stream在创建这个过程的时候,其实只不过就是说。呃,大家看一下之前我们这个代码啊,其实我们前面只不过就是说从这个数据里边把这个直接读起来之后,大家看这个map的过程,这是不是就相当于是有一个基于它的格式,大家看按照逗号切分,这是不是就是基于它的那个CSV格式去做,去做这个解析啊,然后后边把它包装转换成一个po类型,这是不是就是。
09:21
对应的那个字段做了一个提取和定义啊。哎,所以我们这里边的这个with format with gamema,其实就已经在这这个第一步的这个map转换操作当中,相当于是已经实现了,所以后边你基于它再去做,呃,转换成这个表的时候,是不是啥都不用指定了,这个看起来就很简单是吧,那我们现在如果要是本身什么都没有的话,那自然就都得自己定义了,所以接下来我们看一下这个表的定义啊。二啊,我们这个表的创建,我们这里边首先要做的是连接。呃,外部系统读取数据,那第一种情况最简单的,我们是不是可以读取文件里面的数据啊,对吧,读取文件啊,那这里我要写的就是直接,呃,我先把那个文件路径先写下来吧,对吧,因为那个可能稍微有点长啊。
10:22
Fire pass,列出来当前的文件路径,我们还是直接读这个sensor吧,把它先copy过来。诶,前面哦,前面我们没写啊。把这个copy过来,然后接下来。基于当前的这个table env去调用一个connect方法对吧?诶,这里面大家要看就是这个connect方法到底里面要传什么呢?传的是一个连接器的描述器对吧?啊,就是connector script,那这个描述器大家看它是个抽象类对不对,所以接下来是不是我必须得实现这样一个,就继承这样一个抽象类,实现一个自己的类才能才能做到了,哎,那当然了,既然我们都有这个,呃,官方的支持嘛,对应的那个连接工具里边是不是都应该把这个都都有所体现啊,都有所实现啊啊,那所以这里边最简单的这个实现文件系统,当然是flink自带的啊,我这里边只要去new一个,大家看new一个file system就可以了。
11:25
它本身是不是就是一个connector script啊,啊,所以当前我们要的就是它,然后这个file system直接往这儿一放的话,就是大家看到啊,这里边它本身就是个无参的,对吧,那你如果要是无参的话,我怎么知道要读哪个文件呢?诶对,大家想到这里边自然是对应的,可以给一个他的这个pass路径对不对。所以这里大家要注意啊,我是不是接下来还要去调一个pass方法呀,这里边不是set啊,它直接就是点pass对吧,然后就返回一个这个file file system,还是把这个pass就直接保存在当前的这个pass路径里面了,这里边后边加一个点pass,传file pass进来,Connect连接起来之后得到的大家看就已经是一个stream table script对吧?那么这个stream table script接下来它其实要调的就只有一个方法,那就是。
12:25
哎,大家看是不是就是with stemma呀,对吧?那当然了,除了withkima,其实还可以调别的,因为它本身是一个,大家看继承了connect table script对吧?那么在这个抽象类里边,其实这里边呃,除了这个呃,With skima这里边不是这儿啊,另外还有这个table script对吧?大家看除了这个with skima之外,是不是还有另外一个方法叫with format呀?对吧?所以这里边你也可以重写这里边的这两个方法对吧?啊,所以这里边我们就做一个这样的实现啊,这个进程关系可能稍微有点复杂,我们不用管那么多了,直直接实现就可以了啊,直接with format啊,那当前的这个format格式应该是什么呢?当前我们这个不是CSV文件吗?诶,所以这里边我就直接注意with format里边传的是一个format script对吧?啊,那这里边这个script也都是现成的默认flink里边自带的CSV文件的那个格式化工具就叫做OLDCSV哦,但是大家知道既然叫old,那显然就是说这个是老版本的对吧?啊,就是已经要被要被弃用了啊啊那有同学可能想到,那如果新版本的怎么办呢?
13:42
新版本大家自然想到那就是new csv,或者就是CSV,对吧,但是很可惜这里面没有对应的这个版本,哎,那就需要。就需要引入相关依赖了。我们可以打开当前的这个文档。
14:02
把这一个文件系统啊,当前的这个CSV格式的这个依赖,大家可以引入一下啊,就当前的这一个新版的CSV的支持呢,它就相当于是呃,之前我们那个只是一个简单的按照逗号分割的一个这样这个CSV文件的一个提取和描述啊,而现在这个新版呢,它有一些其他的更加复杂的规范啊,比方说它是符合这个RFC4180标准的这样的一个新的格式化描述器啊,它就叫做CSV,所以我可以在这个。Pop文件里面把这个也引入CSV,这个就叫link csv对吧。或者是大家如果想自动把这个做一个自动引入也是可以的啊,引入进来之后,接下来就可以直接在里边把这个就可以引引进来了。这里需要稍微注意一下,就是后面如果大家想要连接一些外部系统的话,比如说比如说卡夫卡,它里边是支持这个CSCSV文件格式的,但是呢,不能用老版本,老版本的那个格式描述器,对吧?啊这个的话,你写入到卡夫卡的时候,它那个有一些需要的那个信息没有带上,它就解析不出来,所以必须得用新版本的这个格式化工具。
15:18
啊,那有了这个之后,另外我们是不是最关心的就是STEM了啊,它的格式对吧,这个表的结构啊,到底长什么样,然后里边这里边大家看要传的其实就是一个叫做ski的东西。我们现在接下来是不是就直接new一个STEM就可以了,New STEM。那么这个STEM里面当然就要有各种各样的一些配置和呃,对应的那些。实现了对吧?那大家看有最主要的一个方法就是field field,那是不是就是指定当前的字段啊,然后这个字段里边的参数,Field的方法里面的参数是什么呢?是不是就是一个string,这就是字段名,然后data type是不是就是数据类型啊,非常简单啊,那比方说现在我们要的这个当前吧ID对吧?那么它的类型,它类型应该是个string,这里边要传的是data,大家看要传的是这个table下边的这个data type对吧?哎,所以这里边我们要要这个data types啊data types点缀把这个传进来哦,这个还没完,大家看这只是一个字段对不对?接下来我们继续来继续field。
16:27
大家看还还可以继续传吗?呃,那第二个字段,比方说我甚至可以改这个顺序啊,呃,大家想就是前边我那个按照按照那个字段去去指定的话,是不是第一个是ID,第二个是对应的这个,呃,第二个应该是时间戳对吧?第三个是这个温度值,我这里边可以改变它的名称,比方说第二个我不叫time Sam,我就叫TS,这也是可以的,对吧,这是完全没毛病的,但是大家想我能把第二个字段叫成temperature,然后把它当成这个double类型的一个温度值吗?
17:03
这个是有问题的,因为第二个字段它它提取出来之后,是不是对应的就是我们这里面定义的顺序啊,我们的表里边第二个字段是不是你定义成了double,但是我们对应的那个原始数据出来是时间戳啊,这个就不合适了,对吧?所以大家要注意这里边的顺序还是要一致的啊,啊,我这里边比方说还是给这个time stamp啊。Data types这里边啊,长整形大家看没有long,但是有big int,这跟MYL里边的写法是不是一样啊,那另外还有就是最后比方说temperature啊,我就叫temp,这个也是没问题的,Data types.double诶把这个定义好,当前就完成了啊,那当然了,即使是with STEM之后啊,这里得到的是不是还是一个连接器,就是表的这个,呃,连接的这个描述器啊啊,所以那这个连接描述器最终我们是要干什么才能得到一个我们想要的东西呢?
18:00
哎,对,它当然是要创建create temporary table对吧?啊,以前的版本里边还留了一个方法,叫做register table SK和register table table sce,那通过这个名字大家就能想到它是不是直接就指定了我当前这个表是用来做输入还是做输出的呀,对吧?Table thinkk table sce,这个就更加明显一点,现在的话就把这个就相当于就这个概念就就不涉及了,对吧,反正都是表,你接下来这个先连接外部系统,先注册,我根本不区分,至于说你怎么用,那就是如果说你从这个表里边读数据from的话,那它就是输入吗?你如果最后是一张表要色into写入,那不就是输出吗?啊,所以现在的版本的API是这样去写的啊,我们用的是这个create temporary table。下边最后一行create出来对吧,这里边给一个pass啊,这个pass指的是表的路径,我们这里边其实是不是就直接给表名就行了啊,因为你完整路径的话是带上那个catalog,带上database对吧?是这样的一个路径,所以这里边我就把它叫做input table,这就是一个读取文件注册表的这样一个过程。
19:10
好,那呃,大家还记得这个,如果我们想看一下它的结果的话,怎么看吗?呃,是不是,呃接下来的话就是当前我现在是只在这个就是环境里边有,如果说我想要去,呃这个最后变成这个流打印输出的话,是不是还得先有,先有一张表啊啊所以这里边我是需要table env,首先是不是from当前的这个input table,这样才能把当前的这个input table才能才能读取出来啊对吧,然后得到的大家看这个from,这就得到了一张一个table,得到了一个表,对不对,Table类型啊,然后table。这个我还是稍微给大家写一下吧,这个是input table对吧?然后基于这个input table,它是不是又可以调env to a stream这个方法,把它做一个转换成流的一个打印输出啊,对吧?呃,那另外我这里边需要它的那个类型,我定义成就叫肉点class,然后做一个print,最后不要忘记env execute执行起来好,这就是我们这个读取数据的这个过程啊,然后这里边我可以运行一下。
20:24
呃,另外大家会想到之前我们这个,呃,Table里边还有一个,它直接是有一个print方法的,但是那个print方法不是打印数据,它是打印当前的表结构,大家还记得对吧?所以我可以在这儿啊,再去加一句这个print print STEM啊,就是回头大家可以看一下当前的这个表结构到底是什么样的啊,我们先看一下当前这个数据应该是什么样子。大家看是不是就是我们原封不动输入的这个一串数据啊,啊从文件里面读取出来,现在呃,转换成这个表之后,我们没有做任何的转换输出,然后就又转成流又输出了,对吧,所以看起来是完全一样的,那这里边如果要是打印这个scheme的话,大家可以看到。
21:13
我们把它再做一个运行。大家看到这里是不是就输出了当前的这个结构啊,对吧?呃,当前它默认的这个就是挂着的这个节点是叫叫root对吧,然后下边是当前我这个每一个字段和它对应类型的这个定义,我当前这张表里面的字段是不是就叫temp了。如果我写CQ提取字段的话,你是不是就不要提取temperature,而是要提取temp对吧?这个大家自然就知道了啊,后面得到的结果都完全没有问题,这就是连接外部系统,连接文件,读取文件数据啊,我们看到的这样一个效果。
我来说两句