00:00
给大家做测试的时候,大家会发现我们如果是卡夫卡数据源的话,这个就必须只能是手动一条一条数据输入了,对吧?那那在有些场景下,我们可能要测试大量数据啊,对吧,我们可能还要测它的这个并发,呃,高并发的这个能力啊,那所以这个时候如果我们一条一条输入,这显然就不行了,那怎么样做这个卡夫卡测试呢?诶,当然是不是,就是我们可以写一个脚本啊,自己写一个脚本,然后把这个数据直接灌到这个卡夫卡里面去,然后生产出来,我们在运行当前的这个代码,是不是就可以直接消费里边的数据了啊,所以这个过程其实还是非常简单啊,那至于脚本用什么来写,这就有各种不同的选择了,我这里边就给大家举一个简单的事例,我们直接就用Java写一个,写一个测试的这个脚本好了,对吧?啊,比方说我就直接在这个ne下边去new一个当前,我这个就叫做卡夫卡,呃,这个producer,对吧?大家知道是不是相当于我要模拟一个卡夫卡的生产者,然后不停的生产数据,对吧?呃,Producer u吧,直接用这样的一个类来表示,然后里边当然是PSVM需要有这样的一个主方法,对吧?呃,我这儿里边可能涉及到一些抛出异常的地方,我还是把这个exception列出来,然后里边呢,呃,我这里边因为考虑到我想还。
01:21
来做一个这个通用的扩展啊,我包装成一个具体的就是写入到卡夫卡里边的方法,对吧,就是包装一个写入卡夫卡的方法,我就把这个public static啊,直接把它叫做void啊,不需要有任何返回,因为是写入外部数据系统嘛,所以这里我直接把它叫做right to卡夫卡。然后它里边可以就是这里边我们可以有一个这个构造方法,可以有一个参数,就是是不是可以把对应的那个topic传进来啊,对吧,你要写入到哪个topic里面去啊,所以我先把这个列举出来,先放在这,呃,哎,这这个这个方法的话,我就直接把这个参数放在这儿啊,String topic。
02:11
然后接下来啊,它里边可能也需要去处理一些对应的这个异常,对吧,我直接把它放在这儿啊,这是我们当前整个的这个大概的结构啊,那前面没方法,这个非常简单了,是不是我直接right to卡卡里边把这个topic写进去就行了,我们当前定义的那个叫hot it对吧?呃,直接去往里边写就行了,那这里边怎么去做这个对应的这个写入呢?首先我应该有对应的那个卡卡的配置,对不对,卡夫卡配置。那么这个配置的话,我就干脆抄一下之前这里的配置吧,呃,但是大家想到这里面的配置其实不全要,因为这里是那个consumer的配置对吧?我这里边其实是想要一个什么的配置啊,而且实想要一个生产者的配置对吧?所以这个消费者组这当然不要了,然后你像什么这个偏移量对吧?呃,最近latest的那个偏移量,这当然不要了,然后这里边的这个序列化,大家看到我这里边用的是那个,呃,里边的这个是stringizer,做的是反序列化,因为大家知道我这里是消费者嘛,那如果说是生产者的话,应该是什么?对sizer嘛,把这个稍微改一下。
03:26
里边这里边这里也注意改一下啊,Stringize。好,把这个做一个做一个配置,然后接下来呢,当然是要,呃,相当于我要去定义一个对,定义一个卡夫卡producer,诶,那这个我们从哪里去找这个卡夫卡producer呢?当然是卡夫卡客户端里面给我们提供的,对不对?哎,这边就是引入卡夫卡客户端的一个原因啊呃,那这里边当然我们的k value6是不是都是string啊,对吧?前面都已经定义好这个序列化的工具啊,都是这个stream啊,所以接下来里边给一个当前的配置项传进来就完事了,对吧?当前我可以把这个就叫做一个KA producer,把它定义出来。
04:13
然后接下来是不是就是我们的数据既然都已经在文件里了,那我的这个脚本就还是从文件读就好了,对吧?啊,就是不用去自动生成了啊,这里边可以去,呃,用这个缓缓冲。方式。读取文本。读取文本数据对吧,就当前的那个文本,呃,File啊,当前的text file,那这里边我们其实就是要new一个buffer的reader对吧?啊,你有一个buffer reader就可以,然后里边我你有一个file reader对吧?把这个创建出来,里边就是我们对应的那个文件路径了啊,我还是把那个做一个copy。既然是写脚本嘛,这个直接写子路径就没关系,对不对。
05:03
放在这儿。呃,然后我把这个先定义出来,比方说这个就叫叫就叫做一个buffer的reader,那么接下来是不是我需要去做一个循环,去读取这个reader里边的每一行啊啊,那接下来我用一个while哦,这样我可以直接在外边呢,先定义一个string当前的这line每一行,对吧,然后接下来我就直接在这儿。做一个判断啊,Line,我直接让他去用这个buffer的reader里边的read line方法对吧,直接读取当前的每一行,然后赋值给当前的这个line,然后大家会想到如果说。当前的这个数啊,呃,如果说它要是不为不为none的话,那接下来我是不是就要处理当前这一行了,那如果为档的话,那就那就外外要突出循环不就完事了吗?对吧?只要做这样的一个判断就完事了啊,这是这个读取这个缓冲,缓冲的方式读取文本流的一个常用的做法啊啊,那所以接下来我们就是你有一个producer,就是我当前得知道当前的这个record是什么,当前的数据记录对吧?那我当前这个要的是一个string string类型k value都是string啊,我现在是不是得传一个topic进来,另外把当前的数据写进来,我的数据是不是就是每一行啊,就是一个line对吧?所以把这个写进来。
06:35
我定义这样一个producer record,然后接下来是不是用那个用当前的那个producer发送数据就可以了,对吧?所以我们呃,用producer发送数据,卡夫卡producer直接呃,Send对吧?调这个方法send,当前的producer record就完事了,当然前面大家也看到了,你做完了这个操作之后,是不是可以把这个producer关掉啊,While循环结束之后卡producer调一个close方法。
07:08
这是一个非常简单的一个,呃,当前一个一个脚本啊,一个批量写入到这个卡夫卡里边数据高并发去写入的一个脚本啊,那我可以把这个运行起来,因为卡夫卡我本来是起着的嘛,所以啊,我可以把这边这个退掉,对吧,这边可以退掉了,然后。这边去写入数据,接下来我把这个代码跑起来,它应该就可以消费数据,对不对啊,同时我在这边启动这两个。代码就可以了,直接就可以做这样一个测试。哦,大家看到现在已经得到这个输出结果了,这跟我们之前那个是不是一样啊,对吧,每五分钟输出一次当前TOP5所有的数据,做一个排序降序输出,诶那这里边你看到这个卡夫卡是不是这个脚本都已经跑完了,说明数据都都读进去,而且都已经灌到卡夫卡里了,对吧?那所以现在如果我们直接开着这边的话,它就会不停的消费数据,然后直接输出我们当前的所有的这个统计结果就完事了啊,它会一直输出,这就是用这个脚本啊,去做一个批量的卡夫卡的测试。
我来说两句