00:00
现在已经知道怎么样从文件里边读取数据啊,然后变成注册成一张表,然后呢,我们把它转换出来,呃,这里边我们没有做转换操作,但是大家知道已经有了table,而且我们在这里边也已经注册过,那你接下来其实是直接调用table API或者是直接写CQ都可以,对吧?啊,所以这里边接下来就是一些转换操作了,我们这里边是只讲了这个读取,这里边还有一个问题就是。有些同学可能看着这个带横杠啊,已经弃用的这个方法就不爽啊,那这里边我们用的是这个OLDOLD的CSV,它为什么是O呢?这个是flink啊,以前的版本里边就就已经实现了的,自带的一个这个格式化的描述器,那那从名字上来讲就知道啊,老版本它是什么呢?它是一个非标的一个状,一个一个这个实现,所以跟很多外部系统,它说是这个CSV文件,但是呢,有很多那个格式化的标准跟外部系统不兼容,不通用啊,所以接下来呢,这个就要被弃用了啊,但是有时候我们还用它,就是比方说这里边我这个文件就非常简单对吧?啊,就是一个文本文件,然后逗号分割,那你用它其实是没什么问题的。
01:10
主要就是说你后面如果要是跟卡夫卡连接对吧,跟其他的一些外部系统连接,那那个可能是不支持这个O的CSV的推荐用用什么呢?以后那当然既然有old就有new嘛,以后的新版本就直接不叫不叫new csv啊,就叫CSV啊,这个我们在呃这个文档里边也给大家写出来了,就是后续,如果要是你直接使用的话,直接就你有一个CSV就可以,但是注意这个不是flink现在的版本直接自带的,你需要再引入依赖啊,引入一个什么呢?Flink csv这个包。对应的版本引入啊,这也是官方提供的嘛,直接引入进来之后,哎,那后面你这里边就其他的定义都完全一样啊,该定义这个路径,定义路径后边就是这个with format的时候,这里边传一个,呃,就是你有一个CSV就可以了。
02:01
这里就不给大家再再试了啊。好,那接下来我们还是给大家讲另外一个读取文件的方式啊,因为如果从文件里边去做读取的话,我们感觉就好像还是这个数据已经先到了那儿了,然后这里边再去读,对吧?呃,这里边就是这种读取的方式,大家可能也会看到,在这个呃,Flink官方文档里边连接外部系统的时候,对于这个文件,文件系统的这个定义啊,它的读的方式呢,可以有批处理的,这个就是一批读进来,然后一批处理,这种方式也可以是怎么样呢?以流式的这个模式读取。对吧,也可以有这样的一个一个方式去读,所以说我们这里边可以以这个流式处理的方式,把它一行一行读进来,然后一行一行去做转换处理,这个是没问题的,但是我们的感觉上总是觉得,哎,我的数据既然是实时来的嘛,那就不应该是已经保存好了,变成了一个文件,那可以怎么样呢?呃,这里边就还有就是。
03:04
更加常见的流式处理,那应该就是卡夫卡了,对吧,所以这里边我们呃读取。消费卡夫卡数据。所以接下来我们再来实现一个啊,这个就不用不用那个定义变量了,我们直接连接卡夫卡,然后。定义一些配置配置项对吧,然后把这个写进来就可以了,好我们试一下啊。Table,呃,Env.connect还是连接啊,这个方式方法就非常的类似,那这里边要连接的话,里边传的这个连接器描述,呃,描述器对吧?连接描述器这个要传什么呢?啊,当然是卡夫卡的一个连接的描述器了,那这里边我们需要自己写吗?还是说可以有这个依赖引入呢?啊,当然不能自已写了,之前我们已经引入了卡夫卡的依赖对吧?这里边我们引入了这个flink卡夫卡连接器,大家还记得这个,呃,Flink connector卡夫卡在这个连接器里边不光给我们提供了流处理的consumer和producer,另外呢,还为这个table API提供了一个这个表的连接描述器,那这个就直接就叫。
04:16
卡夫卡诶大家看这个对吧,Flink table script里边有这个卡夫卡的这个描述器啊,啊,这这个不是不是卡夫卡连接器那个提供的啊,这个是我们在这个呃,Flink的这个table table API这一部分里边,给我们已经提供好的这个卡夫卡这个连接连接器的描述器啊,这是然后这里边描述器里边呢,还需要给它定义一些关心的。一些配置项啊,因为你这个你直接就说我要连卡夫卡,谁知道你要连哪里呢?首先这里边要有一个version啊,这个是必须的,我们这里边是0.11对吧?啊,首先是0.11的版本。啊,我把这个分别写出来啊,Connect分别写出来。
05:01
当前是定义版本卡夫卡的版本啊,然后接下来是topic对吧。当前的这个主题必不可少啊,比方说我们现在读取的数据啊,还是叫sensor吧,就对应我们之前那个data stream API里边的那个写法,我们当时不是也有这个卡夫卡源吗?啊,这里边我们是定义主题。啊,主题啊。然后接下来还需要什么呢?呃,当然还应该有那个proper了啊,这里边你可以直接定义properties,传一个property在外边写好,也可以单独一个一个去配啊,我们这里边这个必须要配置的其实就两项啊,一个是那个zoo keepper,然后是一,另外一个是BOO STEM service啊,所以这里边我先配一个zoo keeper.connect要配这样一个字段里边啊,那当然这个我这里启动的都是本本地对吧,Local host2181。
06:05
然后剩下就是。Tramp service。mo.service这里我同样还是本机local host。9092对吧?哎,只要这几个必备的选项,这里边定义好就可以了啊,然后接下来啊,那大家看到这里边我并没有定义它是这个就是消费者还是生产者啊,哎,所以大家看这就是这个方便的一点,我只要判断它连接就可以了,不管它到底是生产者还是消费者对吧,我只要能连上那个卡夫卡的那个服务器,然后接下来呢,就看整个处理过程当中的这个定义,如果定义的诶,它是一个S端的话,那后面肯定我调的就是那个弗Li卡夫卡consumer对吧?去消费数据,如果定义出来之后,它最后是要做这个这个SK端的输出的话,那最后肯定就是变成了一个弗link卡夫卡的producer对吧?因为大家知道底层的话都是要转成那个流处理的那个程序的嘛,Data stream呃的程序,所以说前面我们其实不需要给它做这个明确的定义,只要后面直接用就可以了。
07:18
好,那接下来这个connect完了之后,大家记得还有这个withchema,对吧,With format,那这里面我们先写这个with format啊。对于这个卡夫卡而言,诶里边现在啊,卡夫卡支持的这个相当于这是一个序列化工具了啊,这里边支持的格式有什么呢?目前支持的有CSV,还有Jason,还有一个,呃,目前就支持这这三种,那这里面CSV当然就不能是就是前面我们的这个old的CSV了,对吧?啊,那就必须得是新版的标准化的CSV啊,所以这里边我还是得把对应的那一个依赖先引进来。在这儿啊,这个弗林CSV。
08:00
把它放在这个文件里先引入。好,然后接下来在这个代码里边,当然就是直接去new一个CSV对吧。再看一下,诶,这是old啊,我们看一下现在可以引入CSV,诶,然后当前的这个CSV大家看这就是呃,这个在flink csv里边给我们提供的一个东西了,对吧?好。呃,然后后边这个skima其实也是类似的啊啊,我们之前你这里面定义这不就是一个表结构嘛,所以前面你是这么定义,那后边其实要定义也是一样嘛,只要我们数据类型一样,所以我就直接把这个抄过来就完事了。所以后边这里的这个写法大家就看到啊,这个基本上就是一大抄,别的那些,呃,外部系统也是一样连接对吧,按照这个标准,你该怎么写怎么写就就可以了啊,需要的东西都都列进来就完事了,那最后不要忘记,你要创建表的话,必须还得有一个。
09:00
Create temporary对吧?这里边给一个名字,比方说我叫卡夫卡。Table,诶,这就是我们的这个写法啊,当然后边如果说我这里边建立了这样的一个连接,那后边我再去读这个做转换的时候,我就可以直接from这个卡夫卡input table了,对吧?啊,那那边我输入的那个数据这里边,呃,这个通过那个consumer读进来之后,就会变成我们的这个数据源啊,就是这样的一个流程。这里面就不给大家去简单的再做这个测试了啊,大家知道接下来这个做法其实是一样的,我们把那个呃,卡夫卡那边提起来,呃,然后呢,我们在那边创建一个生产者,然后这边启动代码,这边去消费,对吧,所以这边是消费者,那边我们有一个生产者,一条一条书,这边应该能够看到启动之后一条一条,呃,这个就是那边一条一条输入,这边能看到一条一条输出。好,这是这一部分内容。
我来说两句