00:00
了解了怎么样把一张表转换成流,那接下来我们再来说一下怎么样把流转换成表。在之前的这段代码里边,流转换成表其实也非常的简单,就是直接调用table en的stream方法。直接传入一个data stream,然后就得到了一个table,这看起来其实比转换流转换成表好像要更简单一些,但是这里边呢,也有一些其他的应用,也就是说当前的这个from data stream这种方式呢,这是相当于直接把当前流里边的数据类型。把它完整的做了一个解析,我们当年是event嘛,它有三个字段,直接把它这个po类的字段属性都解析出来,当成了当前表结构里边的列啊,那对应的名称和类型都是我们在even的这个po类里边定义好的对应的那些结构啊,那这样的话,整个这个表就做了一个完整的转换,看起来非常简单,那我们自然想到了,能不能在这里边做一些更加复杂的自定义的情形呢?当然是可以的,所以这里面我们要介绍的是更加一般化的情形。
01:15
我们可以在这里边直接后边追加一些所谓的表达式expression啊,那这里边的表达式其实就是字段名称表示我们在把一个数据流转换成表的时候,要提取当前哪些字段作为表中的列,诶所以这样去做提取的话,就有一个好处,就是我们可以相当于直接就把这个一开始的一部这个select给省略掉了。直接把这个进行投影映射的这个过程。在转换成表的第一步直接就完成了,所以有时候从留作这个表的转换还是比较有用的。转换成表的过程当中,既然我们可以任意的去选取其中的字段,那当然它的位置也可以任意去指定,另外就是还可以调用它的点as方法,这相当于也是一个表达式,做一个重命名,那么接下来如果说做了重命名的话,在当前的表里边,这个字段名称就变成了当前列的名称就变成了TS,这对于一些比方说像time,我们知道在CQ里边,本身它是关键字,或者说是一些呃函数的名称,方法的名称,那这样的话,把它做一个重命名是非常有用,非常有效的方式。
02:37
除了这种方式之外,这样的话,我们相当于直接得到的是一个table的对象,把一个data stream转换成了一个table。那另外还有一种方式是可以直接把data stream不要转换成table对象,直接把它注册在当前的表环境里边,注册成一张表。啊,那因为我们知道,如果当前我们想在表环境里面直接写CQ的话,你想要用这张表,我们是不是相当于还得把这个表做一个注册呢?啊,那为了省去这一步,我们就。
03:09
Flink底层也给我们提供了这样的一个create的方式,那之前我们说你可以入当前注册的表的名称,然后传入表的实例,当前table的对象实例,然后就可以create temp review,注册一个当前,创建一个当前的虚拟表。那我们现在呢,可以不把对应的table传进来,可以直接把data stream进来也是可以的。那这样的话就相当于直接把当前的流数据流data stream注册到了表环境里边,注册成了一张表啊,那当然了,对应的也是,如果我们是po类型的话,里边的字段就全部提取出来,作为当前表的列名称和对应的类型都放在这里了,那如果说我们想要选取一些特定的属性字段的话,还要做重命名的话,也可以去做对应的操作,那这个操作跟之前from data stream这个方法里边是完全一样。
04:11
这就是把一个数据流data stream转换成表的方法,那这里我们可能会发现了,就是前面介绍的这个过程,好像都是。Whole类型,也就是说我们已经定义好的Java简单对象啊,比方说像event这样类型,那这种类型当然比较简单,这里面所有对应的属性,它的字段、名称和类型都已经定义好了,直接把它一转换,转换成表,这是一目了然的事情。那假如说我们知道对于这个数据流而言,它不一定是po类型啊,我们不光支持po类型,还支持所有的呃,基本的那些包装类型啊,那还支持一些集合类型,还支持各种各样的元组类型,那如果要是其他的类型的话,放在这里又会是一个什么样的效果呢?
05:01
所以这里面我们要考虑一下,在。表和流进行转换的时候所支持的数据类型啊,这个数据类型主要是。Data stream转换成表的时候需要去考虑的,因为表转换成流的话,这个非常简单,因为里边每一个字段一般情况都是一个简单的类型嘛,基本类型,所以直接转换就可以了,那现在如果是流转换成表的话,那我们就得讨论了。最简单的情况其实是所谓的原子类型,就是基础数据类型和一些通用的数据类型,也就是不能再去做拆分的数据类型,我们可以统一把它称作原子类型啊,就比方说integer double string啊,这些都是可以的。比方说我们来了一个长整型long类型的data stream,如果说我们想把它直接转换成一个表的话,那得到的这个表应该是什么样子呢?诶,那正常情况我们就知道。首先。
06:01
这个表里面应该就只有一列了,因为你当前我们每一条数据就只有一个值嘛,然后这个列的类型当然就是当前data STEM的对应的这个泛型了啊。我们直接可以推断出来,然后另外呢,我们在定义的过程当中,就直接调用from data stream这个方法,然后把当前流传进来就可以了,另外我们还可以直接在后面追加一个属性字段,诶,我们知道当前你原子类型里面并没有属性字段呀,这是要干什么呢?这是要重命名啊,就是我可以直接把当前这唯一的一个字段重命名成麦lo,然后传进来,所以这个就一目了然,非常简单的使用。那如果说我这里不传这个麦浪啊,不传这个属性字段,不重命名行不行呢?当然是可以的,不传的话,默认当前的字段名是F0。诶,那我们通过F0这个命名,就可以想到什么地方有F0F1这样的命名呢?之前我们说过flink自定义的元组类型里边,它的每一个原子里边的每一个元素就是F0 f1 f2这样去定义的,所以其实原子类型在处理的过程当中,本质上是当做了一元组他一来进行处理的一个结果。
07:18
哎,那对应的我们就知道了,当前做转换是支持元组类型的。类型。那假如说现在我们定义了一个元组类型的硫二,元组类型的硫,想要把它转换成一个表的话,诶,那会怎么样呢?也非常的简单,那就按照当前元组的次序,第一个,第二个,第一次把它转换成我们表里边的第一列,第二列,那对应的类型,我们元组类型都已经定义好了,那对应的名称是什么呢?当然就是元组里面的属性名称,F0 f1 f2,直接按这个去定义就可以了。那这里面我们可以发现可以做提取,这就跟我们之前讲的那个po类的定义一样啊,可以提取某些。
08:03
属于那个字段名称也可以交换位置,另外还可以点as进行重命名,这些方法都是通用。那另外一个比较常见的类型当然就是po类型了,这是前面我们介绍过的啊,一般在项目当中,在实际应用当中,这是最常见的一种应用方式。最后还有一种类型叫肉类型,而这个肉类型其实是弗link给我们定义的关系型表里边最为通用的数据类型,因为我们知道肉就是行的意思嘛,啊,所以本质上在table API里边,每一个表table里边,它的数据的基本组织形式其实就是肉,就是一行一行的肉组织起来的。啊,它本身肉呢,它也是一个复合类型,因为我们知道肉可以有多个列嘛,哎,所以我们本质上可以认为肉就是一种特殊的元组啊,就有多个列,有有这个好几个属性字段啊,多个元素的这样一个元组,那么它的区别在于它本身长度是固定的。
09:10
而且呢,我们没有办法直接推断出每个字段类型是什么,所以你在使用的过程当中,必须要指明当前肉里边每个位置,每一个列对应的类型到底是什么。所以呃,我们正常啊,在创建这个table的时候,直接使用的那个create语句,我们直接去用这个DD执行DD去创建连接器表的时候,就需要把所有的字段类型都指定清楚,创建出来的对应的每一行就是一个肉类,每一条数就是一个肉类。这个时候定义的这个字段名称和类型就叫做表的模式结构stemme啊,这个在有一些数据库里边是有类似的这个定义的啊,当然在不同的数据库里边,可能对于STEM的含义可能不太一样啊,比方说在Oracle里边,在MYSQL里边可能对它的定义是不一样,我们在这里边就是指代表的结构包含了字段有哪些列字段名称和以及它的类型。
10:11
除此之外呢,除了这些基本的定义之外,肉类型还附加了一个特性,就是相当于它有一些有一些原属性吧,就是我们前面提到的肉。有了这个属性,那我们就知道它就可以表示更新日志流了,啊,就是这个主要就有三种,加I,然后减U加U。包含了这样一个信息之后,我们就可以知道每一行数据,我不光能表示当前表里边真正存在的一行,我还可以表示当前对于这个表要做插入还是要做更新操作。这样的话,我们对于。出现更新操作的那些表,就可以进行对应的这个流的描述啊。
11:00
所以基于这样的一个类型啊,肉类型,我们就有了另外一种。把流转换成表的方式,那就是所谓的from changelock stream啊,前面我们讲的都是from data stream,都是这样的一个简单的方法啊,那或者是我们可以直接调create temp review啊,把它注册在当前的表环境里面注册一张表,那另外一种方法就是from change stream,这跟我们把把表转换成的过程刚好是反过来了,把表转换成的时候,我们是有two stream和to change stream啊,这个更一般化一点,那我们现在呢,是from data stream from change stream。诶,那这from change rock STEM呢,呃,注意它是比较麻烦一点,因为这个里边它要求比较高,就是这里边的数据类型,当前的这个传入的数据流的数据类型呢,只能是肉类型。诶,所以就是这里边的肉必须要指定对应的肉看,所以它一般是由连接器直接帮我们实现的,那有时候我们做测试也是可以去自己定义好,就是定义一些测试数据,比如说。
12:10
下边我们这个例子,那就是我们直接调这个肉点of kind方法,然后定义这样的一行数据,得到的就是一个肉,那大家看到这样的话,就前面先给当前的rock,就有insert和update before up up after,那我们知道这个inser当然就是加了,Update就是更新前的数据,当然就是减u update after就是加,这样的话把对应的数据插入进去,这就是一个肉类型的数据,得到的这样一个流啊,那是肉类型的data stream,它就可以调用from changelo stream转换成一张表。这个在有些场景里边还是比较有用的。这就是关于表和流的转换,可能稍微有一点复杂,但是我们实际应用的过程当中呢,其实关键在于搞清楚当前的流到底是只有插入操作,是一个insert only stream,还是一个更新日志流,就是带撤回的这种操作啊,是一个lore,这一点在后边我们讲到动态表和持续查询的时候还会进一步解释。
我来说两句