00:00
前面我们已经知道了,在这个程序里边怎么样去定义表环境,有了环境之后,那就是标准的流式处理,我我们主要以这个流式处理为主啊,呃,所以而且是后面的代码,如果要必须用到blink planner的话,我们再引入blink planner,如果要不需要的话,我们就用老版本的啊,这个就是啊,这是前提,然后呢,我们的主要处理的都是流式啊,我们最关心的就是实时计算嘛,所以说主要以这个流处理为为例给大家做讲解,那这里边有了环境之后,流式处理的话,当然就是source transform think3步走啊,那这里边首先要给大家讲的概念就是表。因为后边我们所有做的操作啊,都是基于表的操作,对吧,不管是基于这个table,不管是table API还是说呃,CQ其实都是基于表的那。前面我们也讲过啊,那个在这个表环境里边是可以注册catalog的,然后呢,又可以基于catalog去注册表啊,那对于这个表环境而言,就是它会维护一个一个catalog和表之间的这样的一个map,对吧,每一个catal catalog里边有有多少表,这个表注册的数量并没有限制,所以我们可以在里边就是。
01:16
有需要的话,呃,注册很多的表,然后去做各种各样复杂的这个查询和操作都是可以的,那表的概念本来我们都已经非常熟悉了,那对于这个数据库里边啊,或者说各种各样的这个,呃,就是架构里边啊,不同的这个处理引擎里边,它可能都会有对应的一个table的定义,那flink里边的表又是什么样子的呢?啊,它其实就跟我们之前接触过表的概念是差不多的啊,啊就是。以一定的架构,就是以一定的STEM组织起来的这个数据的一个集合,对吧?啊,它就是组织数据的一种方式,由很多行构成,对吧?呃,这一个table里边有很多个肉,然后每一行呢,可以有很多个字段,对吧,就是我们说有有这个不同的列嘛,啊所以整体来讲就是这样的一个一个这个二维的一个数据结构啊,那所以。
02:12
具体来定义呢,每一张表是由一个标志符identity啊,就相当于有一个ID来制定的,这里边的ID呢,由三部分构成,从大到小,这个概念是catalog名,然后数据库啊,Database的名,然后呢对象名,也就是表名对吧?啊,那如果说我们一开始什么东西,像前面我们那个代码里边啊,啥都没指定的话,在这个表环境里边默认是有一个呃,自己的这个默认catalogck,然后会有一个默认的这个database。这是呃,这样的啊,就是我们注册的这个表啊,生成的就定义的表都会在当前的这个默认的环境里边。这是基本的一个一个概念啊呃,然后在flink里边啊,Flink table API里边表可以是常规表。
03:02
啊,就是大家可以认为就是所谓的这个table对吧,注册的时候或者定义的时候就叫table,也可以是虚拟的啊,那就是我们说的那个视图view,其实在我们在做转换过程当中,所谓的这个table。就是这其实是从这个数据库里边啊,关系型数据库里边引引入的一个概念,那大家知道这个关系型数据库里边区分这两个概念,是因为这个真正的table,它其实是实体的,是具体就真正是持久化存储的一个数据空间,对吧?啊,那对于VIVO呢,它相当于是虚拟出来的,我们构建出来的类似于表的一个可以做操作转换的一个视图,所以说这个在关型数据库这样定义是有意义的,其实在flink里边啊,一开始他借鉴了这个概念,事实上我们在操作的时候发现他俩差不多啊,你在flink里边做操作,难道你会直接把这个。表的数据直接去落盘对吧,放到数据库里边,或或者放到文件里边写那个table吗?肯定不会啊,它都是虚拟化的,都是一个,就是我们在内存里边的一个这样的一个组织结构,所以说可以认为大家在做操作的时候,这个table和view啊,大家可以认为是啊,就是基本上是一样的一个概念啊,这也就是为什么前面我们看到之前有这个register table对吧,现在统一都改成了create temporary啊,所以就是他俩大家可以认为是一样的概念。
04:32
呃,然后这里面主要说的就是。如果说我们要做区分的话,那怎么区分呢?就其实操作起来没什么区别,如果你一定要去定义,要去区分的话,那就是常规的这个table呢,我们用来描述外部的数据啊,就是说呃,比方说我们那个要连接外部的文件,连接外部的数据库的表,对吧?或者说消息队列卡夫卡里面的数据读进来了,这样的话,转换过来的这个我们给定义成一个table,对吧?或者从这个呃,Data stream转换来的这个东西,我们是先定义成cable,这就相当于好像是一个实体表一样,因为它对应着一个具体的实体表,那实际上在flink里边,呃,这还是虚拟的对吧?呃,这但是就是说我们为了更加直观的去理解它。
05:19
把这个可以认为这是一个cable,然后在我们内部做转换的过程当中呢,诶生成的那个东西,或者说我们做查询转换得到的那个结果集,我们把它定义成view,定义成一个视图。及时操作过程是一样的啊,这是这个基本概念啊好呃啊,所以前面大家也看到这个代码里边,我们其实有体现的啊,这个代码里边。在这个前面这个示例程序里边,我们把这个一张。一个table类啊,注册成表的时候,我们说现在推荐的方法是create temporary对吧,创建一个临时表,而我们在这个呃。
06:03
哦,这这里边没有没有写出来,我们看一下那个PPT里边给大家之前介绍的这个程序架构的时候,在这里边这个我们注册这张表的时候,从外部系统读取数据,注册表的时候,它用的是什么呢?用的是create temporary table对吧?啊,所以他认为这个是外部有实体的那个数据结构对应的,那这里边我们定义成table,然后里边在做转换的时候,可以认为它是we。这只是一个概念上的定义。好,有了这个概念之后,接下来我们就可以看一看具体在这个代码里边怎么样去创建一张表了,啊这里边给大家要重点介绍的就是这样一个connect方法啊啊,我们可以调用这个表环境里边的connect方法,它其实说的就是要连接一个外部系统。然后怎么样呢?注意啊,连接外部系统之后,必须要调用create temporary table这个方法才能够在catalogck里边注册一张表啊,所以这就相当于是什么我先跟外部系统建立连接,哎,然后呢,我定义好了外部系统里边,哎,它那个比方说啊,我要是从那个MYSQL里边去读数的话啊,那可能就是我定义好了,呃,连接哪个,呃,这个MYSQL服务器对吧?URL定义好连接哪个数据库,然后我要访问哪张表对吧?然后它里边的那个数据结构,STEM码是什么样的,我都定义好,呃,然后这个对应转换过来的这个字段啊,我全部都定义好,这样的话连接好的话,那之后如果我要是有方法要从里边去拿数据的话,那肯定就是那边有什么数,我就把它拿进来,对吧?啊,直接就可以读取这边的数据,读进来之后呢,直接把数据在内存里边啊组织起来,组织成一个表的结构,我们在catalog里边注册一张这样的表。
07:53
当然这个只是环境里边的表啊,大家注意这个不是那个抓完那个table table的实例并没有直接用那个table API可以调用的那个table还没创建出来,你要创建的话,后面我们就在from对吧?啊,From这张表就可以把它创建出来。
08:11
呃,所以这里面大家需要区分一下,就是说我们这里边啊,给大家说的这个创建这个表,创建的并不是那个table类型的那个实例,而是说的是我们catalog表环境,Catalog里边注册上上的这个虚拟的这张表。对吧,那么这个表示可以供CQ flink CQ直接用,直接写CQ可以用这张表的,但是呢,你要调table API的话,它必须基于一个table类型嘛,所以说那个是没办法直接调的,你要调的时候必须有一个from的转换操作。这个大家要要搞清楚它俩的这个区别啊,那反过来同样就是说你假如说已经有了一个这个table类型的话,已经有了一个table类型,那可以直接去做这个,呃,Table API的调用,但是呢,不能直接去写CQ的时候把这个写进去对吧,不能直接from这张表,因为这这不是注册的表,这是一个table类型的实例嘛啊,所以你还需要再去re table对吧,或者创建一个临时视图,这样才能够在CQ里面用它。
09:17
好啊,这就是就是因为涉及到两套API,呃,东西可能稍微转换有点绕,大家搞清楚怎么回事就可以了,然后接下来我们在呃代码里边给大家看一看这个代码怎么写啊,我们还是在这个table API里边,接下来就是。读取数据了对吧,S部分啊,我们去读取数据,呃,第二步啊,第二步读取数据。其实大家发现这个前面我们讲那个整体的处理流程里边,它连接外部系统,不管是读数还是写数,其实都是一样的方式啊,所以这里边我们就是先以读取数据为例,给大家看看怎么样连接到外部系统啊,连接外部系统读取数据。
10:05
呃,那首先第一个最常见的怎么读取数据呢?呃,就是文件了,对吧?啊,我们直接这个读取文件数据。好,先把它定义出来,呃,然后这里边我们就先定义这个文件的路径了啊pass。啊,当前的这个路径,当然我们就还用这个sensor吧,对吧,我们把这个创建出来。呃,这里边我们直接copy copy这个它的全路径啊,Pass直接copy出来。啊,然后接下来,呃,那就是要调用表环境的connect方法,大家看有这个方法对吧,里边要传的是一个大家看到是一个connector scriptor啊,是一个scriptor,那这个4SCRIPTOR又是谁来提供的呢?那就是要不就是link自己给我们提供,要不就是。你引入这个依赖对吧,然后连接器给我们提供,这里边我们文件系统的话,当然是flink自己提供的,这里边我们要拗的是一个file system。
11:10
System。诶,大家看直接这里边啊,Flink table里边就提供了一个这样一个描述器script对吧,就叫做file system啊,然后这里边。大家看啊,这个file system里边我可以去传一些,呃,就是可以指定它的一些东西,比方说指定什么呢?你至少得指定文件路径啊,从哪个目录去去调用这个这个东西呢,对吧,然后得到的这个呃,Pass里边我还可以啊,就是这里边其实不是这个得到的pass啊,直接你只要把这个pass传进去,当前的这个描述器就已经定义好了啊,是我们先把这个传进去啊,不pass。然后呢,基于connect这个connect方法,大家看它会返回的是一个stream table script。
12:02
然后在这个就是流处理的这个表的描述器啊,这是之前里边的那个是连接器的描述器,对吧?然后我们外边这就相当于是一个整个表的描述器,流式表的一个描述器,那这个描述器当然大家看到就你还得定义给码呀,既然是一张表嘛,对吧,那那另外还有常见的一个还得定义什么呢?With里边还有一个叫做with format对吧,我还得定义一个就是。呃,就是格式化的一个描述器,就是你从外部系统读取数据之后,按照什么样的这个反序列化方法给它进行格式化呢,然后再提取数数据字段呢?哎,所以这里边肯定是这两个都需要啊,With format啊这里边我们一般情况哎传什么呢?像我们这里边的这个数据,它不是这个逗号分割吗?啊,所以这类似于一个CSV文件,那这里边我们可以提供一个啊,你有一个这样的一个格式化方法啊,我直接用这个old的CSV啊,大家看这个已经要被弃用了啊,那因为它叫old嘛,对吧,要被弃用了,我们先用这个来看一下啊,这是当前我们这个flink自带的已经提供好的一个方法,好,所以这是。
13:15
这是定义。定义了啊,这个从。外部文件读取数据之后的格式化,也就是反序列化了对吧?方法好,呃,然后接下来我们就是with scheme对吧,你这边提取数据啊,这格式化,然后呢,这里边你的那个skime到底长什么样呢?里边要new一个STEM。啊,那这个我们就直接用flink table啊第这个啊,Scripts下边的这个sIgMa,那定义这个schema,先把它扭出来,里边我们可以用这个链式调用的方式直接定义它的field,对吧,大家看到可以定义它这个每一个字段到底是什么样,那里边的字段就是。
14:05
首先叫什么名对吧?呃,给一个ID叫什么名,然后类型肯定要传入对吧?大家要注意啊,这个field里边它的这个传递方式,后边是要传一个这个data type,最好是传一个data type对吧?啊或者的话,你如果另外传这个呃field type的话,你这个传传这个string类型的话,这个可能后面就稍微的麻烦一点啊,我们更简单的方式就直接传这个data type就好了。Data types来看,直接用Li table API下面的data types,这里边ID它是一个string类型,我直接用string。这是第一个字段,然后哎,我继续读取,还是field啊。诶,这里边这个。我们应该是在这个外边对吧,这里边啊,Field定义完了之后继续往后field啊,然后这边第二个字段,比方说我这个叫time stamp,对吧。然后data stamp data types,当前是一个长整型,我们这里面没有浪,那当然是big int,对吧?啊,类似于这个,呃,MYSQL里边的这种数据类型的定义啊,然后field里边再来一个temperature。
15:15
啊,那同样这里边data types给一个double对吧,把它定义好,这样我们就有了这个数据结构了。啊,所以这里啊,大家看到这个是。这个是定义表的结构。Sche对吧?呃,然后接下来诶,大家看到已经把这些东西都定义好了之后,本身基于这里的这个就是表的这个描述器啊stream table script,那它是一个就是connect table script对吧,这里边我就可以调用。调用一个方法。Create create temporary table对吧?大家看只有这样一个方法,就是连接外部系统只能创建一个好像是对应于实体表的一张表,对吧?呃,这个不能create temporary对吧?啊,这里边就是create temporary table create temporary table里边,这里边给的这个名称路径其实就是当前的表名啊,当然你也可以加上就是我的那个,大家说那个ID,它不是应该是catalog名,然后那个database数据库的名,然后再加表名吧,你也可以从前到后一个一个往后来来输啊,那这里边我们就用默认的就好了,对吧,直接给一个input table。
16:39
直接创建一张。在呃,在这个表环境里边,对吧,在环境表环境里,其实就是在catallo默认catalog里边注册一张表啊,叫做input table。这就是这个过程啊,我们可以测一下这个这个数据啊,大家知道这个要测的话,我们这个测试测试输出。
17:08
当然最简单的方式就是把它转成流,然后做打印对吧?啊,所以如果要是你想把它转成流的话,诶这个有有什么样的方式呢?呃,那那就相当于还是把这个表要提取出来,先变成表变成table对吧,最后再把它转成流,所以这里边我们while定义一个。Input table对吧,它是一个table类型怎么样,怎么样转换呢?大家还记得吗?呃,这个我们说的这个要用flink table api.table啊。我们这里边要用这个env table env的from方法,直接从已经注册好的这个环境里边读出一张表来,转换成table练习啊,有了它之后,那就可以直接转换成流输出了,对吧?这个大家还记得,我们的这个是要求是用这个to a pen。
18:03
要引入它的话,上边不要忘记得有那个影视转换对吧,我们这里边table APS GALA下边把这个影视转换引入啊,那或者这个我们如果不用的话,你可以注掉的啊,啊这两个影视转换一定要记住,一个flink stream apis GALA,一个flink table apis GALA这两个影视转换引入。好,有了这个之后,接下来我们就把它转换成流啊。Upon the stream里边的这个类型呢?哎,大家看到这个应该是三元组了,对吧?Stream这是ID,然后长整型时间戳,最后还有一个double啊,那我们把它这个打印输出,最后不要忘记env execute执行起来,当前这个是table API test drop。好,我们来看一看这个效果怎么样。现在代码已经执行完了,诶大家可以看到我们从之前的这个三这个文件里边读取出来了六条数据,然后map成了三元组样例类,直接做了一个输出,哎,所以这就相当于我们不再依赖data stream做转换了,对吧,直接用这个connect方法来做这个转换,然后输出就完事了。
19:15
这里边多给大家介绍一句,就是这个啊,在这儿写一下啊table env,大家如果要是就是呃,你这个直接点他的这个方法的话,你会发现它有一些注册方法。啊,当然就是环境里边我可以注册这个新的catalog对吧,可以注册这个table,前面我们也给大家说了啊,可以去注册表,甚至还可以注册这个数据流,呃,这个data stream对吧?那另外还可以大家注意啊,里边之前的那个API的写法是什么呢?是注册table sce和table s。啊,这个大家可能看起来更接近于我们熟悉的这个流处理的这种方式,对吧,你是S就注册一个table sce,是think的话就注册一个cable SK,那现在这个方法已经要被弃用了啊,大家看这里边就是说,呃,你现在统一的方法用什么呢?用connect对吧,直接用connect连接外部系统,然后。
20:13
其实这个大家也比较好想啊,就是你直接在这里边去注册这个table sce或者table SK的话,然后呃,其实你真正的这个转换操作大家知道流处理里边啊,并不是你这里边一注册马上就读数了,其实我们这里边只是在定义一个这个处理结构而已,只是定义它的这个处理顺序而已,所以呢,你这里边注册表的时候,并不需要区分到底是S还是think,对吧,这里边的处理逻辑其实可以在哪里去指定的,我们后面你写CQ的时候,不就知道到底哪个是源头,哪个是那个写入的那个目的地了吗?哎,所以我们前面在做连接做注册,完全可以把它抽离出来,诶直接在这里边,我就是直接只是连接外部系统去注册表就完事了。这里好像我们并没有体现出来他是要读还是要写对吧?呃,他是怎么知道他是要读还是要写的呢?后面我们把它转换成表,然后去,呃,转换成流对吧?然后再打印出来的时候,这个才知道,哦,这个表是要写的,呃,要要要读出来往外部去写的,对吧。
21:18
这就是这个过程。
我来说两句