00:00
前面我们其实已经讲完了整个这个表的读取转换操作,对吧?啊,那这个其实就包含了我们在做流处理过程当中的S和TRANSFORM2部分,那最后还有一步就是think。啊,那这个SK呢?呃,其实这一部分就是表的输出嘛,在这个flink table API里边,表的输出其实是通过将这个数据写入到一个叫做table s这样的一个一个接口里面来实现的,对吧?那它其实是一个通用的接口,就是可以支持不同的文件格式啊,或者说我们这个数据库啊,或者说消息队列卡夫卡这样的,对吧?呃,所以。底层我我们其实是调用了这个各种各样外部系统的连接器,它的底层的话就都是实现了一个table s,当然table think经过解析之后,最后肯定是要转换成就是我们那个流处理最后的那个think function的,对吧?啊这个大家是能够想到的这个流程的啊,我们平常使用的话,只要会用就够了。
01:02
那输出表最常见的方式,也是最直接的方式是什么呢?呃,就是直接调一个table的点insert into方法,把一个表写入到这个注册过的一个table think里边,也就是我们这里边注册了一张表的话,这就相当于我们有一个可用的对应的那个table SK那个接口了啊,所以这里边大家看啊,我可以怎么样,前面connect一个外部系统对吧,注册一个表就叫output table,然后后边怎么样的那个得到那个result啊,这个table啊,这个结果怎么样,直接调insert into方法写进去,哎,就完事了。就可以把这个表输出了。啊,所以这个过程其实整体来讲还是比较简单的啊,那其实我们之前在这个代码当中,大家看到。这个print啊,我们这里边的打印。就是转换成一个APASTEM,然后打印,这其实就是一个输出的方式,只不过是输出到控制台了嘛,啊对吧,这也相当于是一个think的方式,那对于这个平常的这种方式而言呢,呃,大家更常见的当然就是。
02:11
定义好这个table之后,直接insert into,而不要再转换成留在print了,对吧?啊,所以接下来我们给大家测一测这个输出,给大家写一写这个输出的代码。好,再去扭一个。呃,这个输出就又得分开写了,对吧,因为可能这个代码会不太一样,我们首先先呃,以这个还是输出到文件作为一个例子啊,我们来测一下这个文件的输出。FS output。呃,Test对吧?好,先把这个创建出来,呃,然后前面这个代码大家知道,既然是测输出嘛,那前面其实创建环境读取数据做转换,那不是基本上都差不多嘛,对吧?啊,所以这里边我直接这个把这个example啊,这个里边的东西都给大家copy过来吧,呃,我们这里就稍微简单一点,呃,我我们先去按照这个啊,Table API test里边我们先把这个表环境先创建出来,对吧?这里边这表环境创建的比较简单。
03:20
好呃,第一步创建表环境,然后第二步后边当然就是读取数据了啊呃,这里边这个呃,读取数据源,我们还是从这个。就大家可以从这个文件里边直接读啊,按照我们定义的这种方式读,大家如果要是觉得有点别扭的话,你你按之前这个呃,先转换成流,然后再map成样例类啊,再去再去把它转换成表,这种方式也是可取的啊,所以这里边我们直接用这种方式吧。给大家看一看啊。上边转换过来的时候,把这个影视转换下划线这个引入,然后另外这个table as GALA这个下边也是,因为后面也涉及到影视转换,对吧,这里边也要把它引入,呃,我们这里边本身这个是。
04:11
上面少了个东西啊。少了一个这个input stream,对吧,从文件里边读取。呃,这个后面长度有点长,我们直接把它先弄过来。放在上边。好,这个input stream先放在这儿啊,这里边是这个呃,流这个读取数据转换成流对吧。然后接下来,呃,而且是卖成样一类对吧。Map成样力类,然后接下来我们是把这个流转换成表。把呃流转换成表啊,这个过程其实也非常简单,哎,我们就直接对应刚才实现的那个对吧?Sensor table,我们定义一个table table,诶这个还要引入啊。
05:10
Flink table api.table啊,用前面定义的那个table env直接from data stream对吧?大家还记得这个方法,然后在这里边我们可以直接把这个data入后边呢啊,这里捎带给大家复习一下,还可以定义那个按名称字段去定义对吧,比方说这里边我ID。呃,ID就不用,不用再重命名了啊,然后后边比方说第二个字段本来是stamp,哎,我不想让它放在最,放在前面,我放在最后,把temperature temperature温度值放前面,那比方说我这个temperature,呃,重命名这个太长了啊,Tempmp对吧,叫做tempmp,然后后边这个time stamp。重命名叫做TS对吧,后面我使用的字段就变成了ID tempts啊,就这样用就可以了。然后接下来。
06:01
呃,这个进行表的转换操作,诶,那这个表的转换操作,这个前面都已经该做的都经做了嘛,我直接抄吧,对吧,API这边抄过来啊,大家如果要是两个都想要的话,我们可以把这个直接都copy过来啊。好直接放在这里啊,这里边就是4.14.2。好,这个result table和aggregate啊,A j result table现在都有了,现在就差真正的输出表了,对吧?我们这个代码核心是想测这个输出啊,所以呃五我们将。结果表输出到文件中,诶所以要输出到文件中,那是不是应该先得定义一个到文件的连接对吧,创建一个这个输出表对吧?哎,所以这里边创建表的连接怎么创建呢?Connect嘛,哎,那之前我们不是用过吗?File system。
07:04
用这个对吧,呃,里边我们这里边要去点pass,诶,这里边还得传一个路径,这个要不我们。把这个路径直接在上面写出来吧,对吧。这个pass。一个string类型啊,直接把它定义string类型,把它定义出来。我们把这个copy一下啊。稍微有点长。好把它定义在这儿,那我们这里面瑞的这个。Fail的时候直接传这个pass就可以了啊,后边我们这里边也是啊,直接传这个fair pass,呃,这个,呃,这里边,哎,其实不应该这么定义啊,我们这里边要的其实不是这个pass对吧,你输出的话跟之前那个肯定不能一样吧,所以我们这里边算了,还是还是定义一下吧。我们就定义在当前这个目录下边,我不要叫sensor了,我叫这个output可以对吧,叫一个output.txt啊,这是这个connect啊,先把它做了一个连接,然后接下接下来该定义的这个format和STEM一样,不能少,对吧?啊,比方说现在我用那个新版的CSV去做格式,呃,格式化。
08:22
之前我们把那个包已经引入了,所以直接这里边用就可以,然后另外skima skime这里边的话,我去new一个skima。里边要去定义它的字段,这个字段稍微有点麻烦,先把这个引入啊。这里边我们要用的是,呃,这个flink table scripts scheme对吧,然后接下来啊,Field的。首先啊,我们就少点吧,我要一个ID对吧,ID的类型是data。Data types。Data string。
09:02
然后后边还需要一个比方说我要一个温度值啊temp。Data types这个是一个double类型对吧,一个一个这个。浮点数啊,Double啊,这里边那个。我们把这个选中之后对齐一下。有了这个format和STEM之后,接下来那就把这个表创建出来,注册在当前的表环境里边,比方说当前这个。就叫做output。Output table对吧,注册出来这就有了,然后接下来怎么做呢?直接输出对吧,就是result table,直接点insert into,然后给到这个output table里边来。大家看就这么简单对吧?啊,然后最后大家还是不要忘记啊,不用再print了,但是你千万不要忘记,我们当前的这个程序得执行起来对吧?呃,这里边把这个呃,哦,这里边上面大家看到。
10:09
这个copy过来之后,我们那个少了一个,少了一个斜杠,对吧,因为Windows下边这里边应该是双斜杠才能指代这个路径。前面copy过来之后,直接copy在外面,不是字符串的话,它默认没有把这个加上啊,啊,所以这里面大家还是要跟之前的这个要定义的一样,大家看具体的情况啊,我们这里边的这个名字叫做FS file system output test drop。好,那接下来我们直接来跑一下,看看效果怎么样,我们是。就是直接定义了这两个字段,然后去输出对吧,呃,一个ID一个temperature,然后也没有做任何的别的那个复杂的转换啊,直接呃得到的这个哦,这里边大家要注意是呃啊对前面我们这里边大家看到啊。这里报错了,我们看一眼。
11:02
大家注意啊,这里边呃,就是我们要选取的这个字段呢,之前因为已经做过转换了,这里边就稍微麻烦一点啊,就是本来我这里边的这个字段,这个已经叫做temp了,对吧。啊,结果呢,我这里边在选取的时候,我要提这个temperature,那肯定找不到啊,对吧,大家看你这个,你要提这个temperature这个字段,这里边是ID和TS,那这怎么提嘛啊,所以这里边改一下改成temp对吧?啊后边这个都是ID,这个就没关系了啊,我们重新再运行一下。好,我们看看这个运行的结果,大家看这个是正常退出了啊,正常做完了退出对吧?啊,那我们得看看这个文件到底写入进去了没有啊,诶大家看多了一个output.text对吧?然后我们打开会看到这里边只写了三条三一的数据,为什么只有三一的数据呢?啊,因为前面我们做了filter嘛啊,我是把这个结果表要写入,所以这里边就直接写进去了。
12:04
啊,这个过程就是还是比较简单比较直观的啊。呃,然后接下来给大家就是稍微再做一个测试,就是可能大家想到那这个aggregate啊a table是不是也可以一样写入呢。诶,这其实是一个非常好的问题啊,我们在这里边给大家把这个做一个做一个示范啊,Insert into,同样还是output table,把它往里写,假如说我现在不是写那个数据了啊,我直接写这个a j result table直接做这个。我们来看一下,跑一下试试。好,大家看到,诶,现在这个代码已经报错了,哎,我们看一下他报错报什么错啊。哦,当然这里边这个这个问题是我们数据类型不匹配对吧?啊,这个这个是因为这里边给了前面我们这个是呃,Double类型的那个temperature嘛,后边这里边这个是个count对吧?啊,所以这里边还得调一下,我把这个注掉啊,然后这里边给一个。
13:08
Field,呃,这是CT对吧,Count。然后date data types,这是一个呃,Big int对吧,没有长整型,所以是一个big int,我们把这个先写出来啊,然后接下来再来看一下这个。运行的结果。看看现在又报错了,呃,看看他这个现在报的错是什么,大家看他报的措施。End stream table think requires that table has only insert changes。一看这种表述,大家一看到这又是跟那个颈追加和能不能更改有关,对吧,就当前他只要求必须只能是做这个插入的写入,所以我们当前这个a j result table,它不是做了那个,呃,一个一个这个更新吗?对吧,我们表里边那个字段要一改成二,二二改成三吗?诶这里边它就要求不行,我这里边不能写入当前的这个table think写不进去。
14:10
为什么呢?因为它是个a pentream table think。啊,这又是个什么东西呢?其实我们能想到你往那个文件里边写入的时候,你说文件它怎么可能就是允许你之前一行已经写进去了,然后你在更改之前写过的某一行,他肯定只能是往后追加,对吧,只能追加,不能修改之前的内容,所以在这个过程当中,你就必须是往后追加,一旦出现修改的这个操作不允许做。哎,这里底层的这个含义,其实我们可以看一下啊,它的底层其实是这个啊,叫做我们当天不是那个CSV嘛,对吧,它的底层是一个叫做csv table think这样的一个实现,它实现了这样的一个table think啊那么它实现的接口是什么呢?呃,这是批处理的对吧,Batch table think,另外刘淑理是什么呢?A panda stream table think。
15:03
啊,所以大家看它只实现了追加式的流式的这个table的S啊,所以这里边我们提示就说这个table think,你必须是往后追加,不能做更改。那如果说我们能要去更改的话,那得怎么样呢?那就是你这这个外部系统这个连接它得支持才行,对吧?啊,你如果是直接这里边去,他不支持你硬要写,那就报错,肯定是写不进去。啊,所以这一部分就给大家先注掉啊。呃,我们这里边能写的还是用这个,前面给大家提到的这个,就是不做聚合转换,直接提取的这个东西,可以直接写到文件里面。
我来说两句