00:00
我们就给大家讲一讲怎么样用一段flink代码把我们处理的结果写入到卡夫卡里边去啊,那我们前面说了,这个flink跟卡夫卡天生一对,连接特别方便啊,那你要连接的话,肯定前提是先得把这个弗林卡夫卡的连接器引入进来,之前其实我们都已经引入了,所以现在直接用就完事儿,对吧?所以接下来我们直接在代码里边做一下实现,我们还是新建一份代码,呃,新建一段代码啊,在这个s test下边去new一个。卡夫卡think卡夫卡think。呃哦,当然了,这个大家如果要是上,就是我们这个test也是也是可以的啊,因为前面我们的很多类在命名的时候都是加了这个test的啊,这个再再加上也是可以的啊,然后我们在这里边把这个可以重命名一下啊,一下好把这个创建出来啊,大家如果看着这个之前这个不舒服的话,我们也可以把上面这个也改了啊。
01:13
File think test对吧?呃,然后里边的这个类也可以改了啊,这个当前的这个单列对象啊,Object也可以改了,然后接下来我们就直接从里边抄了,对吧,前面还是抄,所以说后面我们只是这个think不同嘛,前面还是该怎么样创建环境,创建环境该怎么样读取数据,读取数据对吧?啊,记得把这一个影视转换引入,然后后边我们做完操作之后,不要忘记还有一个execute,把它执行起来,当前我们这个是卡夫卡thinkk test,好,然后这里边我们接下来非常简单,你既然是往卡夫卡里边写嘛,大家知道同样还应该是ADD think,然后这里边呢,来去创建一个这个,呃,实现一个think function,那这个think function当然是卡夫卡连接器给我们已经实现了的,那现在直接new,对吧,这里边new的时候,哎,大家还记得那个弗Li卡夫卡里边啊有。
02:13
这个consumer有producer啊,那现在我们应该又是应该用什么呢。哎,大家仔细一想的话,就会想到之前我们在S里边,因为是要从外部卡夫卡去读取数据,当时我们用的是consumer消费者对吧?那现在我们想要主动往外部去写数据了,那是外部要来读我们的数据对不对?那所以我现在就是直接是一个producer去生产数据,然后哎,放到我们这个卡夫卡里面去,别人可以去消费对吧?哎,所以接下来我们其实要创建的与之对应,之前是consumer,现在是一个producer对吧?啊,应该是这样的一个状态,然后这里边大家看到这个producer呢?呃,相对来讲它的实现的这个构造构造器啊,构造方法就会更加的简单一点,我们看这里边有一个最简单的写法,其实前面就是两个string,对吧,这两个string主要是要什么呢?啊,因为大家知道它这里边其实配置项也不需要传什么,主要就是传broke list,然后给一个这个topic对吧,当前的topic。
03:20
配个名字,后边呢,我们再来一个当前这个,呃,就是序列化的那个工具对吧?啊,再来一个这个呃,Civilization STEM,传一个这玩意儿就完了啊,然后这里边其实我们还为了方便大家知道,呃,你也可以自定义那个呃,就是civilization STEM啊呃,我们这里边你如果要是自定义的话,还需要把这个sensor sensor reading做一个啊,做一个处理对吧?你要定义到底是怎么样把它做序列化,我们这里边简单操作的话可以怎么样呢?之前map的时候我直接调一下to string方法,大家想这个是不是直接把它转成string了,对吧?然后接下来你在做这个转换的时候,这里其实就直接是一个string类型里边我们直接就用那个simple string STEM就完事了,对吧?就像之前在做那个consumer读取的时候一模一样了啊,所以这个就会非常简单,那里边传的参数首先是broke list local host 9092。
04:20
呃,然后后边来一个当前的这个topic,我们来一个呃,Test,然后后边啊,直接去new一个simple string STEM,还是这个东西直接创建出来,大家看就这么简单就搞定了。啊啊,所以接下来我们其实就是还是把这个卡夫卡要打开,然后看一看我们当前的这个效果怎么样,大家看现在卡夫卡已经起着,那所以为了我要看到当前输出的结果,那我其实是应该要在卡夫卡那边,呃,大家想到是不是在卡夫卡那边得创建一个消费者来消费他的数据啊,对吧,我们这边去写那边去消费啊,所以接下来我们还是找到这个卡夫卡下边来去起一个消消费者卡夫卡。
05:16
啊,然后接下来我们把这个该写的写上啊,9092,然后当前的topic必须要一致,当时我们这个是叫think test,对吧,把这个拿过来,好,这样的话我们创呃,创建一个这样的消费者接下来就可以接收这边的数据了,好,那我们来运行一下这个代码,看看效果怎么样啊。大家看这边已经跑完正常退出了啊,为什么退出,那是因为我们这里边是还是读的那个文件数据嘛啊,我们关键看这边儿能不能收到,哎,大家看没有问题对吧?啊,这边这边写这边读取直接收了收到啊没有任何的问题啊,那大家可能会想到通过这样的一个方式,我们前面可以从卡夫卡里边读数据,然后这边可以做处理转换,然后再写入到卡,写回到卡夫卡里边来,所以大家就想到接下来我们其实就可以怎么样呢?是不是就可以拿这个flink做一个中间转换,做一个ETL工具啊对吧?所以你看这个大数据处理工具,它是非常强大的嘛,你如果说拿它来,当然这个有点牛刀,呃,来来来,用来这个杀鸡了是吧?啊,有点大材小用的意思,但是呢,你如果想拿它来做这件事儿也是完全可以的啊,有时候在工作场景里边也还真会这么用,就是说像我们前面讲的啊,不同的门可能所有的数据全混在一起了,然后不同的部门呢,用。
06:47
不同的这个数据,对吧?所以前面可能我们就做一个ETL,呃,也有可能就是别人要做的那个,我不知道我我我不知道该怎么做,我就先做一个分流,我把自己想要的数据都拿出来,然后我把自己的数据,呃做ETL输出到自己要的topic里,那别人的数据呢,原封不动分流之后给他放到另外一个topic里,这样也是可以的,对吧?这就把前面的这个分流和后边我们做的这个往卡卡里边写数据的操作联系起来了啊,那有时候我们就是管这种操作叫做构建一个数据管道,对吧?啊,大家来看一看,那就相当于得是卡夫卡进卡夫卡出,因为实际我们处理这个流失数据的时候,你不可能真的还是从这个,呃,当前的这个文件里面去读取啊,所以接下来我们再定义一个从卡夫卡。
07:37
读取数据,那当然这个读取数据我们就不用重写了,对吧,大家知道就是艾source,然后把那个flink卡卡consumer引入嘛,这个我们就直接用之前里边那个代码直接过来就完事了,我直接把这个过来啊。然后呃,这里边同样该引入的东西都引入,呃,这里边我就不要用这个STREAM3了,对吧,我就我就直接叫做stream好了,然后接下来大家看这里边我还是sensor对吧,就是消费者这里边读的数据是sensor,然后读进来之后呢,后边呃,这里边就不要基于input stream就直接基于这个stream就好了,好接下来做同样的转换,然后我们把它做一个输出对吧?现在的话上面这一部分你也可以住,要住掉不住也行,对吧,因为后面没有用到嘛啊,所以接下来我们看一看它这样的效果是什么样的啊,好,我们先把它运行起来好,然后这里边大家还会注意,我还得另外再起个东西,对吧。
08:39
就这里边,既然你这里面又是卡夫卡进卡卡出嘛,那我现在卡夫卡这边是已经有一个消费者了,但是呢,呃,你还没有这个进的东西呢,哎,我们还得有一个生产者,所以接下来我们还是到这个。到卡夫卡下边来啊。
09:00
呃,然后接下来还是去创建一个producer卡夫卡consumer,呃,Producer,然后接下来把这个该写的写进来,对吧,Broke list local host 9092,呃,然后接下来这个topic,大家要注意一下,当前的topic应该是我们这里边要去消费的这个topic,对吧?啊,现在应该是三好把这个创建出来。然后接下来好,大家知道,接下来我们就是一行一行数据输入,然后看这个效果了,对吧,输入的数据还是我们原始的逗号分割的,这样的每一个字段一个字段,一行一行,我们看一下这个效果啊,放在这里。诶,大家看到看到上面跳变了对吧?诶,我这里边输入了一条数据,这里边真的就输出了一条数据,如果说大家还想在这里边,就是我们的这个flink代码里边有一个打印输出的话,你可以看到它中间转换的过程,对吧?你可以把中间转换过程再做一个打印输出,所以大家看到这个过程其实就是从卡夫卡producer这边进,然后经过flink的转换,然后再通过flink这边写入到卡夫卡另外一个topic下边卡夫卡出,对吧?而且大家看到进来的时候是原封不动的这个逗号分割的一个一个字段出来的时候就已经包装成了sensor reading样例类类型啊,所以这个就是一个完整的数据管道的测试过程,大家可以下来之后把这个再练习一下。
我来说两句