00:00
我们已经知道怎么样去创建表,然后对表使用table API或者直接执行CQ去进行查询转换,得到的结果表再写入到外部系统当中去啊,所以整个的这一套流程跑通,我们现在已经知道怎么样使用table API去实现我们的业务需求,我们可以完整的写出一个link。不过在实际的开发应用当中,我们知道一般测试这个业务逻辑的时候呢,首先肯定不会创建实际的连接表,把它写入到外部的文件系统,或者是写入到数据库里面,这个就太太重了,这个操作太重了,一般我们都是在控制台去做测试的打印输出,而测控制台进行测试的话,我们使用连接器表的这种定义方式稍微有一点麻烦,因为相当于还是得写一个完整的CQ,而且我们要明确的知道得到的这个数据。数据每一个字段数据类型啊,结果表里面的这个结构到底是什么样的,这个看起来就比较麻烦,有没有更简单的方法呢?
01:06
其实之前我们在。简单上手的事例里边就有非常的实现,那当然就是把它转换成一个数据流,一个data stream,然后就可以非常快速的直接把它打印出来了,这个代码就会非常的简洁啊。所以接下来呢,我们就来介绍一下在flink table API里边表和的转换操作。啊,那这个表和流的转换,前提我们要说一下,就是当然必须是针对流处理程序了,如果是批处理程序的话,我们就没有意义,没有没有必要去做这样的一个到data stream的转换啊,本身你这个就是批处理嘛,一批数据嘛,所以我们对于to data stream这样一个方法的调用,也应该是基于一个stream table environment流的表执行环境。我们才能调对应的这个转换方法啊,那所以这一部分我们就还是在这之前的这段代码里边去进行一个测试和说明。
02:09
啊,首先我们先来介绍一下。把一个表转换成流的方法,那前面我们都已经看到,非常简单,就是直接调用它的to string方法,然后里边把对应的这个表table的实例啊,对象实例传进来,就可以得到一个data,接下来我们直接调print方法打印可以。这种方式看起来非常简单,但是我们会想到那假如说。我们这里前面我们这里做的是一个非常简单的CQ,就提取的字段打印输出,假如说我们现在还有类似于之前这个比较复杂的CQ转换呢。而且我们这里边把当前的。这一句。放copy过来,当前是一个聚合转换。
03:00
进行这样的一个CQ执行啊,那前面我们这里边用到的是table,所以还是把这个我们没有做任何的注册啊,或者我们可以在前面再加一句。就是如果说我们这里想直接在CQ里面使用的话,那就调用table env的create temp view方法把当前的。表的对象注册到环境里面啊,那比方说我们这个就把它注册到叫。然后event table对象实例放进来,接下来我们就可以直接用这个table,那然后我们统计的。当前我们这个就叫user啊,没有叫username,所以就直接s user,然后u from当前的click table,那后面也改成group user,这样的话就可以把它查询出来。然后接下来。想要直接把它做一个打印输出的话,诶,那既然当前是流的表行环境吧,我们就可以想到了直接to。
04:06
然后里边传进来的是a j result,然后再做一个打印print。这样是不是就可以了呢?我们可以尝试一下运行。我们会发现这里直接报错了,之前我们运行代码是没有问题的,现在报错,那就一定是我们聚合转换这个打印输出的问题,我们可以看一下。报错信息到底是什么?我们可以看到当前定义了一个table s,因为我们知道既然是做了转换之后要打印嘛,很显然,呃,我们当前就是有一个对应的S任务的。在table API的底层,本身是区分table和table table就相当于是我们的任务嘛,所以我们看这里的这个table s,它有什么问题呢?并不支持update changes。也就是说,更新操作。
05:03
而我们这里边为什么它有更新操作呢?因为它有group agg操作group,然后select啊,所以我们就会发现,既然是分组做了聚合,很明显这就是类似于之前我们的一个KY之后做聚合的操作嘛,它是有更新操作的,在我们前面做了这个直接控制台打印的时候,我们发现它是有减优加优这个操作的。诶,那为什么他直接to date stream就不能转换成功呢?因为我们这里直接to date的话,默认就是接往后添加数据的,就是来一个往后添加一个,来一个添加一个,那这个时候如果他做了更新操作的话,相当于我就解析不出来了。这里就有了这样一个问题,大家那我们就想到,那难道说当前直接转换成流这种方式就行不通吗?并不是这样的,因为既然我们在这里使用连接器表能够把它成功的打印输出,很显然它也是可以转换成硫的。
06:04
诶,只不过这里面我们转换的流要复杂一点。这里要转换的是一个。我们也看到了下面有一个方法叫做to changelo stream,什么叫做changelo stream呢?Changelo更新日志,那更新日志的话,当然就代表了我当前如果要做更新操作的话。那就得记录到底更改了哪一条数据,诶,那我们想更改哪一条数据,如果你用日志的形式把它表示出来的话,应该怎么记录呢?诶,当然我们可以说,你就说哪一条数据发生了更改,更改成什么样子不就完了吗?诶我们会发现啊,当前如果说我们还是要转换成流的话,你这么长一串,显然这是不符合我们要求的,因为我们转换过来之后应该还是一个啊,就是一条一条数据的这样的一个记录,那我们应该把它记录成什么呢?那就是。当前你要更新哪条数据,我可以看作要把它删除掉,然后如果删删除掉之后更新成什么样子,那就相当于又插入了一条什么样的新数据,所以更新日志本质上一次更新就可以拆成两个数据,两条数据两步操作,第一条第一个操作是。
07:24
删除掉一个旧的数据,然后第二步操作是增加一条更新后的新数据,这就是我们所说的更新日志流啊,那得到的这个数据,那就是每来一个更新操作,每得到一次更新之后的结果,一条数据对应我们转换成流的话,就应该有两条数据了。啊,那当然在最后如果要打印出来,或者说是进行处理的时候,我必须得记录当前它到底哪一条是要删的那个旧数据,哪一条是真正我们更新之后的新数据,所以我们前面再加一个前缀,就是所谓的减U加U这样的一个操作,呃,这样的一个前缀定义。
08:10
那个减U加U,以及我们前面看到直接插入时候的那个加I。这个在API的底层,其实是叫做,也就是当前我们这一行数据的类型。哎,那当前的这个类型呢,如果是加I,那就表示是新增插入的一条数据,那如果是减U呢,诶,那就表示是。更新前的一条数据,那加优呢,就是更新后的一条数据,这就是我们关于这个前缀的一个具体的解释啊,那我们其实发现了,就是当前的这种定义啊,用两条数据减之前的一条,然后加上更新之后的一条,两条数据表示一个。更新操作那有点像什么呢?有点像是我们在微信里边发了一条数据,然后做了一个撤回,然后又发了一个新的数据,做了一个更改一样啊,那所以有时候这种流呢,这种表示更新日志的方式啊,有时候也叫做。
09:13
我们可以看到叫做撤回流retra stream,诶,所以这个时候我们直接用这个,当然了,如果是to rere stream的话,这里我们后面还要传入对应的这个class啊,这个会稍微的麻烦一点,所以一般情况我们直接就用更新日志就可以了,To changelo stream就可以。那我们现在把它改成,然后再来执行一下,看看效果怎么样。现在我们看到就可以真正的得到对应的输出结果了。我们现在可以。停下来看一看它的输出结果是不是符合我们预期的,这里边我们看到诶来了一个carry瑞的数据,那我们统计出来就有一个carry瑞一,这个是加爱,插入了一个凯瑞一,那来了一个丽丝的数据呢,当然就是加爱爱丽丝一,又来一个爱丽丝的数据,现在这是要更新了爱ice丝有两条数据了,那就变成了减U爱ice丝一,撤回之前的爱ice丝一这条数据,把它更新成爱ice丝二,加U爱丽丝二。
10:20
啊,那同样来了其他的数据也是一样的操作。这就是所谓的更新日志流具体的用法。把一个table如果要转换成data stream的话,主要就是这两种方法,To data stream和to change to撤回流呢,可以认为是to changelo stream的一个底层的具体实现啊,可以认为是一种特殊的具体现。To stream这样一个转换成的方式呢,我们知道如果当前的这个表里边只有简单的插入操作的话,也就是说我们只是提取数据啊,或者说简做简单的转换,来一条数据我们就追加一条,来一条就追加一条,那这样的话,这个流我们对应的可以把它叫做仅插入流insert only stream啊,那这样的话,对于仅插入流来讲,我们可以直接调用to data stream方法去进行转换成流的操作,那如果对于有更新操作的流而言啊,那比如说这里面我们进行了分组的聚合计算,那么得到的数据它是要不停更新的,对于这种流我们把它叫做。
11:32
更新日志流,或者说有些情况下,我们可能是一个撤回流retra stream啊,那不管是怎么样,我们都可以去调用一个tolo stream这样一个方法,然后把它转换成一个流,当然了,Tolo stream这里边lo我们说它的可以有减优加优。和加I3种,哎,那当然了,如果说我们当前只有加I,那不就是警察入流吗?所以说调用to datetream的方法的地方,我们也可以直接调用一个to change lock stream,这个都是没有问题的,直接执行都是可以执行下去的啊,所以如果说我们在代码当中不确定当前出现什么状况的话,直接toloream也是完全正确。
我来说两句