00:00
到目前为止呢,我们已经完整的实现了一个flink流处理程序啊,那这个过程当中我们是只使用了table API和CQ啊,那我们看一开始先去创建了一个表环境,然后呢啊,那就是创建一张表用来做输入,这是一个连接器表读取文件,然后接下来呢,就定义表的各种查询转换操作。最后我们再创建一张用于输出的连接器表,将结果表直接写入到输出表中,就完成了整个流处理的过程。整个这个过程当中我们会发现啊,完全没有涉及到data。全部都是表的操作,而且中间涉及到的呢,主要推荐的都是CQ的转换计算。对于一般的应用场景呢,我们会发现这种方式其实是非常的友好,CQ是我们比较熟悉的一种技能啊,所以呃,如果说我们已经有了这样一个技能之后,接下来就不需要做更多的迁移,就可以直接上手去写link的程序了,那在实际测试的过程当中呢?
01:07
又会有一些其他的问题,好像我们这里呢,是直接将输出的结果啊,创建了一个连接器表,直接把它写入到外部的文件系统当中了,那平常如果说我们做本地测试的时候,一般都是在控制台直接打印输出就可以了,没有必要真正的把对应的数据写入到外部系统啊,这个操作比较重,所以我们就会想到了,那能不能我们在进行本地开发测试的时候,就直接把得到的结果表做一个控制台打印呢?哎,其实之前我们都已经说过啊,对于thentream而言非常简单,直接调print方法就可以打印输出了,但是对于table呢,很悲剧,它没有直接的print方法,所以不能直接做打印,那我们应该做什么样的操作呢?之前我们说了就是把它可以在转换成流就可以进行打印输出了,那就可以看到控制台里面的信息了,所以接下来最后我们在这一节里边需要去介绍的内容呢,就是表和流之间的转换。
02:08
啊,那首先我们要讲的就是怎么样将一张表,一个table转换成一个硫data stream。那最经典的方式当然就是直接调用。Tablena表环境的to date string方法,把一个表table直接转换成date three。啊,这其实也是之前我们在简单上手这个案例里边去做的操作啊,所以在这里我们可以把这个结果表这个先助掉,不再去写入到文件当中了,而是做一个打印输出。转换成流打印输出。打印输出。所以呃,这里边我们可以直接把result table转换成流,但是注意啊,不能直接基于这个table去调用to data stream啊,Table本身没有这样的方法,我们要调用的是table env的to data three方法里边传入一个table的对象实例,当然就是把这个result table传进来就完事了啊,那得到结果我们可以直接print打印出来。
03:12
里边我们可以指定一下当前就叫result,当然了这里还有另外一个问题啊,就是因为我们现在是转换成流data stream去进行打印输出的,那完整的一个流处理程序呢,最后一定是要有一个Env.execu要执行起来的啊,之前我们是将结果表写入输出表的时候啊,直接做了一个e insert,这个就不用最后再去执行了,那现在呢,就必须还得有一步执行操作啊,那这个最完整的这种调用方式应该我们还是前面啊要使用stream。Execution environment,基于他去创建一个table env,然后呢,呃,我们把这个Env.ECU要执行起来啊,那如果说没有这个的话,直接调table en v.ECU也是可以的啊,那当然了,现在的版本里边我们可以看到啊,Table env如果直接去调这个execu的话,这个方法其实是被弃用了啊,那所以这里边我们应该是因为点execute执行起来,这样的话就没有问题了。
04:16
好,接下来我们可以运行一下,看一看输出的结果,能不能在控制台做一个打印输出。我们知道现在这个table result table里边啊,就一条数据,那我们看到就是Alice第三秒钟的这个点击时间,我们直接把它输出到了控制台打印。整个这个过程还是非常简单的啊,哎,那所以我们就想到了,那是不是所有的得到的这个结果表都可以做这样的一个控制台打印呢,直接调to stream就可以了呢?啊,那首先我们想到啊,不管是CQ得到的table,还是调用table API转换得到的table,那其实肯定都是一样嘛,都可以直接啊我在我们这里啊,作为参数传给to date方法做一个转换,转换成流打印输出,那这里面的关键在于我们看啊,这里还有一个URL accountable。
05:08
这样一个经过聚合统计计算之后的表,能不能同样转换成硫做打印呢?接下来我们就看一眼。Table env还是to datere,我们现在传入的是URL counttable,然后做一个print,这个我们管它叫做count。然后直接。打印输出,我们看一下结果怎么样。哎,我们看到现在就直接报错了,我们看一下抛出了什么样的异常。哎,我们看到这里面他说的是当前的这个table think啊,Table的这个think任务,它并不支持doesn't support。一个有更新的改变,Update changes,诶,这是什么意思呢?啊,这其实就涉及到了之前我们所说的啊,输出结果里边,前面有一个小小的加爱。
06:05
这样一个标记,哎,那我们说这个加I表示什么呢?I是insert的首字母,所以表示的是我们当前输出的结果啊,在这个结果表里边就是追加,就是一行一行数据往后面追加的。诶,那我们就想到了,在表里边一行一行追加,那自然而然这就。可以就看成是来一个输出一个,来一个输出一个,这不就是一条流吗?哎,直接转换打印成流,转换成流打印输出完全没有问题,但是现在这一个经过聚合统计之后的这个个数呢,Ul count呢。那就不一样了,它不是每来一个数据就直接在后面insert追加的,有可能出现什么呢?哎,那我们统计啊,每一个用户,比方说爱丽丝。他点击了一次URL啊,统计了一个Alice丝一,然后后面又来了一条数据,之后他访问了第二次,那就把之前这条数据改成了ALICE2。
07:03
这很显然不是追加在后面,是要把之前的那条数据做一个更新改变。哎,所以我们看到啊,对于这种有更新改变的操作,Update changes这样的一个表,如果想把它直接转换成流去做输出的话,诶,那看起来就做不到,没有办法直接把它转换成流。因为我们这里边啊,假如说之前已经有一个爱ICE1,现在我们要把它改成爱丽丝二的话,本质上来讲,之前我们已经输出了啊,如果作为这个流式数据的话,已经输出了一个ALICE1,那很显然这个数据我们就收不回来了,你怎么改它呢?那在这个控制台里边打印,这个是没有办法去做更改的啊,打印出什么就是什么,覆水难收,哎,那在现在如果说你硬要把它转换成一个流,那就只能抛出异常了。哎,所以我们接下来要用另外一种思路去做一个解决,什么样的一个思路呢?那就是我们不光要输出当前处理的最新的这个数据是什么,还要指明当前这个数据。
08:10
到底是追加上去的,添加了一条,还是做了一个update操作,做了一个更新操作。哎,所以我们说的啊,之前这个加I这个标记,就表示它是追加上去,直接增加一条数据就可以了,那如果要是做了更新操作呢,那你加一个其他的标记,比方说update啊,我们来一个加U这个符号,这就表示要做一个对应的更新操作。好,哎,那对应的这样一个转换成流的方法呢,也就有所改变了,就不能直接调table en的to data streamam方法,而是要调一个to。我们看到还有一个方法叫做to change log stream方法。什么叫做changelo呢?Lo就是更新日志,所以我们现在记录的是什么呢?你既然不是有这个更新操作吗?那我就记录一下你的更新到底是发生了什么事情,那我就记录一下你现在不是要把这个爱丽丝一这条数据。
09:12
更新成爱丽丝二吗?那本质上啊,我这一个更新操作就可以看成把之前的爱丽丝一删掉,然后替换成爱丽丝二,不就是这样两步操作吗?哎,所以一次更新操操作写在更新日志里边的话,那就是原先数据的一次删除,以及更新之后新数据的一次添加。啊,那所以我们看一看啊,To change log stream,这样能不能正常打印,打印出来又是什么样子,哎,我们可以直接。装一下运行一下,看看现在的效果怎么样。我们可以看到现在可以正常打印输出了啊,同样上面这个result的话,那只有爱丽的一条数据,这个我们就不说了啊,它都是加I表示只有一条插入,它就是来一条数据,就在后面追加嘛,只是一个单纯的转换,不涉及到聚合,而后边我们这里涉及到了聚合,哎,那就要用这个更新日志流的方式去进行输出了,那它输出的是什么呢?哎,我们看统计按照这个数据啊,我们看一下。
10:16
Click里边的数据来了一条Mary数据之后,诶,那我们现在MARY1,这个MARY1就是完全新增进来的,之前没有数据,所以我们看它是加I。新增了一个MARY1的统计。然后来了一条鲍B的数据呢,诶,那当然了,鲍B之前也没有过嘛,同样是加I,新增鲍B1,爱丽丝第一条数据来了,那同样也是加I Alice丝一。接下来来了Mary的第二次访问数据,那我们知道当前的Mary就要由之前的MARY1第一次输出的MARY1要把它更改了。要把它改成MARY2,但是我们说打印在控制台的这个覆水难收啊,没法改啊,那怎么办呢?我就做一个更新日志的声明,所以呢,这一条数据到来之后,我看到。
11:04
这里我们统计的count值输出的是两条。它是一个更新日志,一个是减优减优,哎,那字面上我就知道它是做了一个update操作,减的话当然就是删除嘛,删除之前的MARY1。然后加U,那就是增加新增一条更新之后的数据,加一条Mar r啊,这就是把之前那条数据改了,那同样最后再来一个Bob的访问数据的时候,那我们就是减u bob1把BO1改成BO2加u Bo。这就是更新日志流的这样一个转换的过程,哎,所以我们会发现啊,对于进行了不同的。转换计算的结果表,我们最后想把它转换成流打印输出的时候,需要调用的方法也不一样啊,那一般化的这个没有经过聚合计算啊,没有更新操作的这些结果表呢,我们可以直接to stream,那如果说要有更新操作的这些表,那就必须to change log。
12:08
另外前面我们可以可能也看到了啊,直接在这里调用的时候,To后面还有一个方法叫做toend stream啊,另外还有一个叫做totra string,这两种方式呢啊,我们字面上理解啊,End就是。颈追加的流嘛,啊追加流啊,它本质上呢,就是我们说的啊,只有加爱的这些操作的一个流。那retra stream呢?Tract是撤回的意思,转换成撤回流啊,所以撤回流就是指的我们这里啊,有更新操作的时候,一个更新操作,把它描述成一个减优和一个加优,这就是所谓的撤回啊,就像我们在微信里边撤回一条消息一样,撤回重发这样一个过程。那关于这一部分呢,又涉及到了flink table API当中对于表和流进行转换过程当中的编码方式啊,这个我们会到后边啊再去进行详细的讲解,现在我们只要知道啊,如果没有更新操作啊,那普通的这种转换得到结果我们直接to data stream就可以了,那如果说要有更新操作的话,发生了update操作啊,呃,像我们做了聚合之后的这种情况,那就to change stream。
13:16
这就是把表转换成流的过程。
我来说两句