00:00
我们已经做完了表的创建和表的查询转换,那对应在数据流里边啊,Data streampi里边的source和transform两步操作就都已经完成了,那最后呢,我们还应该有一步think任务啊,那就是把处理的结果再输出到外部系统当中,那对应在这个表的处理过程当中呢,当然就是要创建一个输出的连接器表,然后再把它写入到这张表里面就可以了,哎,那所以这个过程呢,跟前面我们这个创建输入表的其实完全类似啊,所以想要去进行输出的时候。我们做一个输出表的创建。诶,那这个过程我们完全可以去copy之前输入表创建过程,当然了现在会有所不同,哎,那比方说我们现在呢,就不要叫做even table了,比方说我们直接叫做。Output table啊,然后呢,对应的字段,那也得是看我们到底要去写入什么样的数据嘛,比如说我们就把这个result table要去做一个写入这张表里边所有的数据呢,我们看到啊,其实就是三个字段啊,那当前所有的三个字段其实是没有发生变化的,那我们直接还写u ID string URL string,然后TS,这个是完全没有问题,当然了我们也可以把它去做一个更改啊,因为当前啊,我们要写入的三个字段,这个是表里边的名称,那至于说写入到这个auto table里边,在这张表里边叫什么名呢?诶这个其实不重要啊,跟我们之前可以一样,也可以不一样,这就像我们在这个MYSQL里边啊,你去创建两张表,这两张表里边它有可能这个对应的字段啊,呃,是相同的类型,是相同的含义,但是呢,你给的这个名称列名完全可以不一样了。
01:47
所以这里边比方说这个UID啊,我们可以直接把它叫做user啊,当然了,User如果我们考虑到它是这个CQ里面关键字的话,我们可以叫一个username,这个是完全没问题的啊,那后边这个TS,我们可以把它叫做time STEM time STEM是关键字的话,我们也可以改一下啊,可以直接加上这个反引号,哎,这样的话就完全没有问题了,所以这种方式还是比较常见,比较简单的啊呃,那对应的后边我们的连接器呢,也可以直接输出到文件里面,文件系统file system,那pass呢,诶,这里就不要指定单独的文件了,因为我们这里面默认的输出啊,其实就是一个分布式文件系统的输出方式。
02:27
这里我们并行度是全局已经设了一了,其实如果说呃,并行度不给一的话,也是可以做并行输出的,比方说我们这里边直接就给一个目录。就叫做output,然后同样格式化的这个工具还是CSV,那接下来我们就可以执行这样一个操作。啊,这里还要注意的啊,就是我们创建这张输出的表呢,这并不代表已经把这个数据写入到文件里了,因为这只是创建了这个连接到文件的我们对应的一张表而已嘛,啊相当于只是建立了一个连接,那接下来真要把数据写进去的话,那还得有诶,就是指定啊要把哪张处理结果表里的数据写入到这个output的table里面啊,所以这里边我们执行的操作那就是。
03:19
第五步。我们要将。结果表写入。输出表中。哎,那所以这里边我们执行的啊,其实就是对前面我们得到的result cable调用一个方法,这个方法就叫做execute insert啊,那当然了,之前我们看到还有一个已经被弃用的方法啊。叫做insert into,哎,这个也非常的直白,那就是说直接把它的信息啊,插入到对应的哪张表里面,我们现在呢,要这个要被侵用了,我们执行的是execu exsert,哎,那里边要传入的呢,简单的就是一个string,这个string很明显就是输出表的名称,这个名称呢,必须得是已经在表环境里边注册过的啊,那所以当然了,我们这里边创建连接表的时候已经注册了嘛,就叫做outut table。
04:13
这样的话,就把我们输出的结果写入到了对应的文件系统当中,这就是这个执行的过程。啊,这里还需要多说一句的是,在之前我们进行data vpi调用的时候呢,啊,最后总是不要忘记啊,有一个烟v.execu要执行起来流处理吗?那现在我们发现啊,已经没有env,没有这个流失的执行环境了,那怎么办呢?哎,那是表环境要做一个执行起来吗?没有必要,我们看到它本身最后一步操作就是execute insert,所以已经带了这一步执行操作,所以接下来呢,我们不需要有任何别的定义了啊,所以这个其实就是跟我们流式处理那个程序啊,整那个整套逻辑都已经没有关联了,直接按照表去进行转换处理就可以。
05:01
好,所以接下来我们就可以直接运行一下看看这个测试结果了。运行。啊,我们看到这里直接报错了啊呃,这个报错的信息呢,主要是因为我们这里的这个format啊呃,格式化的这个工具,CSV对应的这个依赖我们没有引入,哎,那这个依赖我们要引入什么呢?啊,就是我们说的这个file system对应的这个支持啊,是已经有了,但是CSV这个格式当前link默认是没有内置进去的,那所以我们现在还需要引入一个依赖啊,其实关于这个连接器表啊,连接到不同的外部系统的这个操作呢?呃,我们后边在11.9这一节还会做详细的讲解啊,那这里边我们就会介绍连接到各种不同的外部系统啊,这个CQ到底应该怎么去写,那这里我们就不详细去介绍了啊,关键就是说我们如果使用到了CSV这样的一个格式化工具的话,诶,那我们需要引入对应的依赖支持,那就是flink csv啊,那对应的版本呢,跟我们当前flink的版本还是一致的,我们需要把这个copy到po文件当。
06:09
中。好把它引入我们看一下。已经在每文当中引入了对应的东西了,接下来我们重新执行一下这一段代码。我们考察一下能否正常的输出结果,诶,我们看到现在就没有报错了,正常运行完成,好,那么我们到上面的目录去寻找一下outut下边,诶,我们看到果然就多了一个这样的路径和文件,我们看到当前筛选出来的数据呢,啊,就是Alice,它对于ID唯一的商品页面第三秒钟的一次访问,哎,那其实我们知道啊,对于当前所有的数据里边,确实爱ice就只有这样一条访问数据,那当然筛选出来就完全没有问题了。
我来说两句