00:00
好,那现在大家已经知道我们在代码里边可以创建一张表,这相当于是流处理里边的source任务,然后呢,后边我们做了表的查询,不管是用table API还是用CQ,不管是简单查询,简单查询相当于我们之前讲的那个简单转换,对不对,Map filter对吧?啊,那呃,也不管是像我们做了聚合之后,这就相当于之前我们那个KBY之后做聚合计算的那个过程吧,不管是怎么样转换,这都相当于是一个transform的过程,那最后还剩一步,是不是接下来我们应该有一个输出,有一个think呀,哎,那think的话,前面我们也已经说了,整个的那个表的处理流程里边,是不是就相当于我们在定义的时候可以不区分到底是到底是这个输入的还是输出的,是不是都把它创建出来就可以了,创建出来之后最后要think的话怎么办?是不是直接一张表in色into就完事了啊,所以这个过程其实特别特别简单啊啊,那接下来我们就在代码里面先。
01:00
给大家实现一个最简单的输出,比方说我们想写入到写入到文件里面,既然之前我们讲过那个连接文件系统嘛,所以接下来我们就讲一下这个啊,Table test3当前是输出到文件。File output。那前边的流程的话,这个大家是不是觉得就非常简单了,我应该是就跟这个差不多对吧,我先把这个环境先创建出来,然后我这里边直接就用这个最简单的啊创建创建环境的这种方式。好,先把这个写在这里啊。然后下边我先把这个写出来,最后应该要有一个Env.execute执行起来的这个过程,然后接下来就是一步一步操作啊,第一步这个环境我们就不说了,后边是创建,那大家看我先连接这个外部数据库,把这个三四数据先读进来,对吧,然后创建这个表,后边呢,我可以做这个,呃,查询转换这个干脆我都copy过来吧,这都没没什么问题啊。
02:07
接下来我们是想把得到的一张表是不是要做一个打印输出啊,呃,我们现在不是打印输出了,输出到文件对吧?之前我们打印输出是转换成流打印的嘛,现在我们不是了,我们想输出到文件,输出到文件,那这个怎么输出到文件呢?是不是首先应该要对注册一个输出表啊,对吧,我们是。连接外部文件注册输出表,那这个注册输出表的过程是不是应该跟前面注册输入表的过程是一模一样啊,对吧,只不过就是。当前的这个路径会不一样,然后诶我把这个写一下,这个叫output output pass fair pass对吧,Output pass这里边同样还是连接的时候,Connect file是不是就应该传这个output pass呀,然后里边这里我们不要直接放在这个三四里面啊,我直接改一个名,比方说我叫out,这个可以对吧,直接叫out.TXT,然后同样我输出还是用这个CSV,这也没问题,然后这个sche码是不是要要改变啊。
03:20
这是不是要跟我们输出的那个数据本身那张表的结构要完全一样啊,比方说现在我们直接就把这个result table要做一个输出,大家想我这里面应该给什么呀?我当前是不是就应该是ID还是string对吧。后面是不是temperature就没了?对吧,当前这个,呃,那个time就没了,对吧,但是后面temperature是不是应该有啊。大家看我这里边最后提取出来字段不就这两个吗?对吧?所以这里边我应该要有这两个字段,当然我这儿的输出的话,是不是我可以跟那个不一样啊,因为这是我注册的那个表的名称,对吧?那这个我新的这张表的名称是不是可以跟你这里边中间转换的这个表的名称可以不一样啊,那这个是没问题的,对吧?所以这里边我给一个temperature,那对应的就是我连接到的,呃,这个当当前对应的那个那个数据啊,我是要的是这个temperature这样的一个字段,对吧?然后接下来这个就改一个,这个叫output table,那最后来一个什么操作,是不是直接result result table去掉一个insert into是不是就完事了,这里边我直接把这个out table写进来,Output table大家看就这么简单对吧?好,那接下来我们运行一下,看看这个效果怎么样。
04:44
这个我们就是试图从一个文件里边读取数据,然后做一个非常简单的转换,然后把得到的结果再写入到另外一个文件里面去。好,大家看现在已经exit code0啊,这个正常结束已经输出,我们看一眼当前这里刷新一下啊,大家看是不是多了一个out呀。
05:06
这个al里边数据,诶是不是就是346啊,因为我们提取的不就是它吗?啊,所以就这么简单啊,当然有同学说,如果说我这里边用的不是这个result table的话,我用的是。大家可能会想到啊,我这里边如果不用这个result table,我要把这个a j table要做一个输出,可以吗?来运行一下,大家可以看一下啊,这a table这又是一个什么效果。看这里边直接报错了,为什么会报错呢?哎,其实跟我们之前的啊,当然这里边首先是有一个问题,就是我们这个类型不匹配对吧?哎,这个它也是想套的,这个本来我们这里边是只有两个字段,我这儿输出的是三个字段对吧?而且是这个类型都不一样啊,所以中间是不是还有至少是有一个这个呃,一个长整型的这个count值对吧?所以我加一个build啊啊那这里面比方说我这个叫呃,CT对吧,然后后面来一个data types.big int啊,那现在的话这个是不是就可以输出了呢?
06:23
看一下这个结果怎么样啊,会不会继续报错?看到还是报错了,那我们再看一下他说什么。大家看他说的是。Upon stream table think require,这里只能有增加的信息,对吧?增加的改变就是插入insert的改变,这是不是跟我们前面转换成流的时候那个有点像啊,所以大家会想到当前我这个转换成流的话,它是不是每一条数据,每一条改变都得带来两条数据的。插入啊,但是大家想那两条数据,我如果写入这个这个文件的时候,我能根据它那个true false那个就能就能,呃,把我之前的那个数据能改掉吗?好像做不到对吧,文件系统怎么能直接找到之前的那个数据能改掉呢?所以大家就会发现了,输出到文件这里其实是不支持聚合之后有更新操作的这种写入的,就像我们那个topa stream不能直接打印一样,对吧。
07:26
啊,那所以这里边如果要是给大家看这个本质的话,它其实是什么呢?其实是大家看是这个啊,本身我们这里边文件系统,它底层是一个csv table s。它实现的这个接口只有什么呢。只有batch table think批处理的写入,另外还有a pen stream table s文件是不是只能往后追加的这种流失写入啊,如果要是待更新的那种流失写入是不是就搞不定了啊,所以大家就看到了,为什么报错报这个呢?就是因为这里边它只实现了这样一个追加的方式啊啊,所以大家可以看到就是往文件里边写入数据的时候还是有一些局限的啊,这就是这一部分。
我来说两句