00:00
我们已经用这个flink作为数据源读取数据,做了一个简单的测试啊,那当然有同学可能会发现,如果要在呃实际做这个测试的过程当中,如果说我这么一条一条这么去输入数据的话,这显然就有点太麻烦了,对吧?呃,如果说要这样去做的话,我们我我们当然可以是在这个地方,就是呃,你不停的去去producer里边去把这个数据输入,但是如果说我们是想批量化的读取这个,呃,就是文件已经都放在这儿了,对吧?我们现在是想直接用它来做一个测试,但是呢,我们又不想直接读取文件,因为我们实际项目上线是要用到这个卡夫卡的数据源嘛,所以接下来呢,我们就还是把这个数据源用这里边的卡夫卡,然后接下来呢,诶,我们直接自己写一个类似于脚本程序,或者说我们额外的一个处理程序去把,啊,就是文件里面的数据呢,一条一条读出来,去向卡夫卡里边。
01:00
去写入对吧,就类似于我们自己去写一个卡夫卡producer的一个生产生产程序,然后接下来我们把这个代码写起来,它就会自动的帮我们把所有数据输入做一个测试啊,接下来我们可以看一看这个效果啊啊,那这一部分我们就直接在下边去新建一个代码,呃,当然这个也是一个object,因为类似于一个脚本了,对吧,我这里边直接就叫做卡夫卡producer。U啊,那接下来在这个代码里边,其实我只要执行一个到卡夫卡里边,写入到某一个topic,某一个主题里边的这样一个操作,实现就完事了,对吧?那主要就是两步,呃,就是从文件里边读取数据,然后把它写入,创建一个卡普塔producer,那这个过程我们可能就需要引入卡普塔客户端了,之前我们一开始就引入过,所以接下来呢,哎,我直接把它包成一个方法来写吧,比方说我这个叫做呃,Right to,卡夫卡里边呢,可以传一个参数,就是当前的hot items,这个topic,对吧?那接下来我们主要就是自己实现这样一个方法,整体的流程就是这样,我把这个定义成topic。
02:19
不需要有任何的返回值啊,因为我们只是在用这个卡夫卡producer去发送数据啊,啊,那这里边首先还是把这个该定义的配置项我应该要定义出来对吧?哎,你有一个proper,这个大家如果要是说呃,就是简单去操作的话啊,可以直接把这里边的很多东西抄过来啊,比方说我把这个一抄先复制过来,我们再看一看哪些东西要改就完了嘛,那现在我们不是去创建消费者了,是要一个生产者,所以说消费者组当然就不需要了,那另外我们创建消费者的时候,这里边要的是反序列化的这个工具izer对吧?那但是现在呢,我们要既然是一个生产者producer,那应该要的是sizer,对吧?哎,这里边传这个就完事了,好,然后接下来我们在呃后边去直接把这一个生产者创建出出来producer,我们就要。
03:19
一个卡夫卡producer,这个就需要从诶大家看啊,当前导包的话,我们用的就是卡夫卡clients里边的producer把它引入,当时我们是导入了这个卡夫卡客户端的啊,然后里边本身的数据类型,我们当前key和value,大家看到这里边它这个泛型的定义啊。这里边泛型的定义是有两个,一个K一个value的,所以这里边呢,我们一般写入的时候就全都是字符串嘛,对吧?直接用这个string类型把它写入就可以了,那后边再去传入参数的时候,最简单的一种方式,大家看下面的这个,呃,就是我们传参的这种构造方法啊,最简单的就是传一个properties对吧?啊,前面我们既然都已经定义好了嘛,你要是不在这定义的话,我们就在上面去定义这个zeroizer对吧,其实是一样的啊,那这里边我们直接把上面定义好的proper传入,这样就构建出了producer对吧?啊,然后接下来其实就是读取数据了啊,我们说从文件读取数据,而且接下来是逐行行写入卡卡啊,所以接下来我们首先是要定义一个,呃,就是从文件,就是用调用系统的那个IO的S。
04:43
方法对吧?从文件里边应该要把这个当前的每一行先拿出来,然后再去遍历,这样的话就可以,那我们这里边定义一个,呃,Buffer source对吧?Buffer的source调用当前系统IO的source方法,大家之前还记得skyla里边的这个读取数,读取文件的这个方法对吧?From fair里边要把当前的文件路径传入copy pass传进来,读进来之后那就是一个for循环了,遍历里边的每一行啊,那这里边的每一行我们从这个八分的S里边拿出来,接下来就是要定义一个最后发送的一个数据,Record new,一个producer record。
05:34
生产者生产出来的数据记录对吧?那同样还是k value string都是string类型里边,那就是这里边我们看一下它的这个构造,构造方法啊里边呃,要传很多这个参数,最简单的传参方式呢,诶大家看到就是传一个topic,然后传K和value,这里边我们不定义K的话,直接传value就够了,对吧,最简单两个参数就够了,所以这里边我们非常简单,把这个当前的这个定义啊,直接放在这个hot,哎,呃,当前我们应该是已经传进来的那个topic,所以直接用传进来的topic,另外呢,把当前的line作为value传入,诶这就这就搞定了,对吧?呃,然后这里边大家还要注意一下,就是呃,我们定义的这个producer record这里啊啊,这里我们用这个客户端里边的这个producer record,对吧,我们看一下当前没有去实现的这一个。
06:34
哦,这里边我们的这个V本身应该是string类型对吧?应该实现的就是当前的这一个producer record的这个实现方法,但是这里边还在报错啊,大家发现就是这里边我们呃,这个读取文件之后拿到的是一个buffer source,所以你如果要想拿到string类型的每一行的话,那其实应该要调用它的这个有一个getlines方法对吧?这里边拿到的这这个这才是我们真正的一个string类型的一个able类型对吧?啊,这样的话拿到的这个line就是一个string类型了,所以这样的话就不会报错,要不然的话这个数据类型是过不了的,好,然后我们得到这个当前的这个数据之后,那就可以用producer,前面我们不是定义了producer吗?发送数据就完事了,对吧?调用这个producer的send方法,把record发送出去啊,就这么简单啊啊,那当然最后这个for循环把这个便利完成之后,我们把这个producer关闭close啊,这就是一个完整的流程,好。
07:34
那接下来我们给大家再整体做一个测试,那测试的时候其实就是这边要起消费者对吧,我们把这个消费者先起起来啊,然后那个卡夫卡那边是起着的,这边再起一个生产者,类似于这是一个调用了生产者的一个脚本程序啊,所以这两个代码我们同时把它起起来,大家看一下效果怎么样,好把这个也提起来。大家看到这里边我们这个运行报错了啊,然后这报的错是卡夫卡,这里边这个string zeroizer,这是一个a not an instance对吧,Of sializer,我们要传的是sializer啊,那这里边我们只把那个参数,就这里边我把这个改成了erializer,那里边传的这个类没没改对吧?哎,这里边本来我们应该是要传一个这个theializer的,这前面给的时候还是有点粗心啊,大家注意前面你这一个字段要改,配置项要改,我们后边的这个类当然也得改,对吧,给一个sizer,然后再执行一下。
08:33
重新执行。好,大家看这边代码已经提起来了,然后接下来这部分代码我们没有任何的输出对吧?我们最后的输出是,诶大家看这里边已经在输出了啊,而且我们这边是会输出当前的每一条数据和那个聚合结果,所以大家看是诶隔一段时间有大量的这个聚合结果输出,然后会有对应的一个当前我们窗口的一个统计结果输出,对吧?啊,这已经是在做这个操作了,然后我们会看到这边如果要是说读取这个数据啊,已经读取完毕,已经把这个数据全部写入到卡夫卡的话,其实我们知道这两个代码其实没什么关系,对吧?啊,就是这部分我们可以认为这就是一个脚本了,他只负责诶大家看这边已经跑完了,运行结束了,对吧?他只负责把这个数据读出来,塞到卡夫卡里边去,其实跟我们这里边的这个flink流失处理程序没有任何的直接联系,而我们这里边呢,只是直接从卡夫卡那边去消费数据,对吧?啊,跟那边其实也没关系,所以那边跑完就跑完了,这边我们持续去。
09:37
对,去读取数据就完事了啊,这就是这样的一个测试的过程,我们在工作当中有时候想要用这个快速测试的方法啊,而且不想改我们的这个,呃,文件里边就是当前我们读取文件的时候,不想改我们的源码,这里边本来应该提交到生产环境,应该是基于卡夫卡去消费数据的,你不想改这个源码的话,那我们就写一个脚本去测。
10:01
这是一个实现啊。
我来说两句