00:00
前面我们讲过了,呃,输出到文件,把这个表里面的数据输出到文件,当然我们也可以连接到一个像卡夫卡这样的一个消息队列,对吧?而且我们说在流式处理里边,卡夫卡的应用是非常广泛的啊,那之前我们在学习这个data stream API的时候也讲过啊,弗Li跟卡夫卡哎特别匹配起来工作,这个做的特别好,我们经常就可以怎么样呢?数据源那边读一个卡夫卡的S,然后呢,经过弗link转换操作处理之后,最后再由卡夫卡做一个输出,对吧?啊,这样的话,这就是一个完整的数据管道啊,那卡夫卡如果要做输输出怎么怎么做呢?啊,其实大家看非常简单啊,跟我们之前那个SS那边定义的那个方式是一模一样的,对吧?呃,所有的这些都是先连接上卡夫卡,然后呢,诶后面我配置这个format,配置STEM,注册一个表就完事了。那至于说它到底是think还是S,那是通过我们后边那个处理转换过程定义出来,哎,然后这个我们的那个,呃,就是flink底层会table API对吧,帮我们会解析出来,转换成对应的source和S,调用那个连接器里边的consumer或者是producer,所以这里边其实我们根本不用考虑到底是什么东西啊,只要建立连接就完事了。
01:19
啊,所以先接下来我们来做一个这个呃,从卡夫卡读取数据,再输出到卡夫卡这样一个数据管道做一个测试,啊,那这里边我们就写一个新的object,这个我们就叫呃,卡夫卡pipeline。Pipeline大家知道是那个管道的意思对吧?应该叫data pipeline啊,数据管道,Test。好,然后这里边的这个main函数,呃,前面这一部分的话,那那其实就是就是一样的了,对吧,我们直接从这个呃呃,这个既然是卡夫卡源,那肯定我们就不需要去,呃再去读取这个文件,读到这个流里边,然后再做转换了,对吧,我们直接用那个table API,当时我们做过那个卡夫卡源嘛,直接把这个copy过来啊啊,然后我们前面还是先创建这个表的执行环境。
02:13
先把它这个写入进来,该引入的影视转换做一个引入table API,这两个都得引,对吧,这两个都引进来,然后接下来呢,后边这个读取数据的时候,哎,我们直接跳到后边这是2.1是文件啊,卡夫卡对吧,终于到这儿了,消费卡夫卡里边的数据,先把这个创建出来,创建一个呃,就是卡夫卡input这张表对吧?然后后边当然是要把它转换,呃,这个直接直接变成这样一个input table,那后边我们甚至可以把这个就是聚合转换都抄过来,对吧?啊,到时候我们看一看这个到底是怎么做啊,好,我直接把这个都抄过来吧。抄到这个表的转换这里啊。诶不对,我们是从这个消费卡夫卡数据这里,对吧。
03:00
抄到这里啊。呃,这个代码其实就是大家如果已经知道它是什么用途,知道它这个用法之后啊,就是来回抄啊,我们主要是测这个自己比较关心的这些功能,新的功能我们专门去好好敲一敲,前面这个类似的东西我们就不详细去写了,那这里边我们就直接写,这是第二步对吧,消费卡夫卡数据,然后呃,注意啊,这个我们还没有直接消费卡夫卡数据,所以是定义啊,定义这个。呃,卡夫卡。这个定义到卡夫卡的连接创建,这里边建立的是一个输入的表,对吧?输入表,然后后边我们要输出到卡普卡的话,那还应该有输出表对吧?啊,所以这个过程其实是非常的类似啊,啊那接下来我们就是已经有了这个table,还有a j table之后,那接下来呃,我们就是输出到卡夫卡了,那同样这个还应该是env connect啊,再去注册一张表,对吧,Connect卡夫卡,所以这个过程跟前面几乎一模一样啊,那这里边我就不再详细去写了啊,我们直接把这个抄过来算了对吧?啊,这个这个写法确实是没什么,没什么有特点的地方啊,就直接copy啊。
04:22
Copy比过来,然后这里边要改一些地方,那前面这个版本,注意主题要改对吧,你不能还是输出到之前的那个那个主题啊,啊,所以这里边我们得换一个主题,按之前我们那个写法s test对吧?啊,我们做一个这个主题,然后z kper,还有这个卡夫卡的服务器,这个bootem servers这个都要定义出来,后边呢,Format还是用CSV,然后定义sche,注意这里输出的schema就要根据我们得到的那个数据做调整了。比方说现在我们是这个result table啊,那大家看就是一个ID,一个温度值,一个temperature对吧?呃,这里边这个,诶我们看一下这个temperature是没有问题的吧,前面我们没有定义那个字段吧,啊,这个没有问题对吧?这里边我们这个STEM里边定的是temperature啊,不是定义的temp,上次就有这个不匹配的一个错啊,然后这里边我们已经把这个得到的是一个string和一个double类型啊,那接下来就在这个stemma里边就删掉这个这个time stamp对吧,留下这个time,一个double类型,这样就可以了啊,那或者说我这个嫌它这个太长,我就叫temp,这个名字可以不一样,对吧?啊,这个是没关系的啊,这个表的名字这是没关系的。
05:38
然后后边我得叫一个不同的表,对吧,我叫卡夫卡output table啊,这样的话接下来就是输出,那输出的时候result table,直接insert into啊,这个卡夫卡output table对吧?直接把这个输出出来,最后不要忘记执行起来,把这个代码执行起来啊,当前是卡夫卡pipeline的一个测试,Test drop。
06:05
好啊,大家看这个整个过程非常的简单,我们关键是看一看这个效果怎么样,做一个测试啊,我这边还是要先把卡夫卡那个骑起来啊,先看一眼这边卡夫卡起不起着。哦,不起着不起着,那我们去这个,呃,首先啊,我去这个look k下边把这个look k骑起来对吧。呃,这B下边z k server。Start,先把这个提起来。然后我们接下来是启动这个卡夫卡看一眼啊,接下来了,接下来进这个卡夫卡下边对吧?呃,并下边我们去启动卡夫卡的话是卡夫卡server start对吧,然后后边呃,比方说我们这个经过启动啊demon。DA mon啊,然后要指定当前的那个server pro,呃,Properties配置文件对吧?把这个先指定出来,然后后边大家想到,既然这里边是一个呃,一个这个数据管道,我们这里边只是用这个table API做这样的一些处理,那显然就是flink,这边控制台是没有输出的,对吧?控制台不做输出,那后边我们这里边就得有这个producer,也得有consumer对吧?呃,数据管道嘛,卡夫卡进,卡夫卡出,所以接下来啊,我先看一眼这个卡夫卡正常提起来了吧,好,提起来了,接下来我们创建,先创建一个生产者,这里边我们要用卡夫卡console,呃,定义这个生产者producer,呃,里边我们要杠杠,呃,BOO,啊,不是啊,是broke list对吧?Broke list local host 9092,然后后边杠杠topic。
07:49
生产者,诶大家注意,这里边生产者我们是要把那个数据生产出来之后,由谁消费呢?诶这里边连接之后。
08:00
这个卡夫卡input table去做消费对吧,而是这里边要去做消费,而且这里边我们后面这个查询转换的时候注意啊,这里边还得改一下,大家看这里边我们还用的是input table,我要改成卡夫卡input table。这样的话,解析这个代码弗林就知道了,哦,你这个原来是圆对吧?这是source,那我这里边得是一个consumer consumer消费的主题是sensor,所以我们这里边生产者的主题就应该是三四,对吧?先把这个先定义出来。好,我把它这个放在上面啊,这边先定义出来,然后接下来我再启动一个bash,因为我们还得有一个这个消费者嘛。先把它放出来,我同样进入到这个目录下边啊。呃,卡卡下边,然后呢,我们启动一个消费者,同样卡夫卡console。Consumer啊consumer,然后后边是这个boot strap。
09:01
Service。Book server对吧?Local host 9092,然后杠杠topic,注意这里边我们要消费的数据是什么呢?是flink代码这边,哎,这个写出来的,写出到卡夫卡的数据对吧?那写出到呃,是哪张表的数据写出来呢?是卡夫卡output table写出来对吧?啊,这张表的这个,呃,这个就是我们那个result table的数据写出来啊,写到那个卡夫卡output table里边,卡夫卡output table是跟谁连接的呢?诶,连接的那个主题是SK test,所以这里边我们解析出来flink这边会有一个producer对吧?会有一个卡夫卡的producer,它的主题是think test,所以这里边我们消费的主题也应该是think test,哎,所以是这样的一个过程。好,我们把这个生产者消费者都定义好,那接下来就启动这个卡夫卡pipeline test这个代码,我们来看一看这个真正的数据管道的这个测试啊,能不能成功啊,那接下来我们要发的数据,当然就还是跟之前一样,对吧,直接从这一个这个文件里边读取出来,一条一条发送发送就可以了。
10:17
好,现在这个已经提起来了啊,看起来是一切正常,我们接下来发一条数据。给到诶这里边没有copy过来啊,Copy一下,然后给到这里传进来,哎,这条数据已经发,诶大家看这里边是不是就有一个数据输出啊,对吧?诶这里边就是我输出的这个呢,不是原封不动的数据,因为在这个弗link里边还做了这个转,诶这不是这个了啊在这里啊,做了一个简单的转换操作,做了一个select和filter,对吧,我现在只要SEN1的数据,哎,所以这里边。我既然第一条数据是341嘛,所以当然就输出了啊,那同样我们还可以继续往后做这个测试,后边如果说我这个继续给346的数据的话,诶大家想这里边输出不输出呢?没有输出对吧?啊,这个是正常的啊,因为我们filter只要341嘛,那这个当然是没有的。
11:15
然后后边继续测试,诶这个还是没有copy上啊,347347,诶这个好像是键盘有一点问题啊好,这里边当然还没有输出对吧,这个大家知道啊,三四十三十三四七每一个这个数据肯定是没有输出的啊。这个自然是只有三一的数据在这里边才能够有对应的输出,对吧?来看这里边又又输出了一条啊,然后三色一再来一条的话,这里边又会输出一条对吧?啊,但是这里边它是有一个换行,这个是根据我们这边的那个就是格式化工具对吧?我们用那个格式化工具给它做这个格式化加了一个换行导致的啊,那这个就没关系了啊,对于我们消费数据而言,不需要多考虑这个问题,这就是我们所谓的卡夫卡进,卡夫卡出完整的一个数据管道对吧?这样绕一圈再输出。
12:09
啊,大家在实际项目当中可能经常有这样的需求,这样的应用。
我来说两句