00:00
经过前节课的学习啊,我们终于将消费者相关的理论部分已经讲完了,那接下来呢,我们开始进入代码实操。那首先我们来看第一个案例呢,就是独立的消费者去订阅某一个主题的数据。那详细看一下需求,需求这样的说,我们需要创建一个独立的消费者。然后呢,他来消费负二的主题当中的数据,那负二的主题你要注意负二主题呢,它是三个分区。那三个分区的数据都由这一个消费者来进行一个消费,哎,就满足这样的一个需求。那这里面有一个注意事项啊,什么注意事项呢?说在消费者API代码中必须配置消费者主ID,即使你是一个独立的消费者也得配。那为什么命令行的方式的时候,我们不需要配置对应的这个消费者主ID呢?其实它是自动的帮你随机配了一个消费者主ID啊,这个地方一定要注意啊,一会儿写代码的时候啊,我们来看一下。
01:01
那行,首先呢,我们这里面创建一个报名。然后这里面。开启代码,那么我们再把这个类名也复制过来啊,也就说自定义一个消费者。那大家思考,我创建这个消费者,我要做几件事?几件事啊?回忆一下这个生产者部分对吧,第一个。创建一个消费者好。那下面。下面正常应该是消费数据对吧?哎,那其实呢,我们第二步呢,是叫订阅主题,比如说你要消费哪一个主题数据,那我们这里面消费的是不是这个first。First的主题啊。那我要订阅他之后,那最后一步就是消费数据。就完了。那这里面如果大家是高手的话,是不还应该补上一步叫零叫配置。
02:00
是这样吗?行,那下面呢,我们就一步步来做哈。这个创建一个消费者。哎,卡法肯修膜,看一下这个卡法肯修膜里面是不是对应的这个K和Y6啊。那思考一下这个K和Y6应该是什么?跟我们生产者是不是一样啊,哎,这里面传过来第一个值是不是应该是实际类型,第二个呢,是不是也应该是实类型,比如说你传过来数据,你还记得生产者发送数据的时候指定一个key,哎,然后之后呢,Value指定的,比如说hello。知道吧,哎,Key呢,一般是空值嘛,啊,那它的类型都是string类型,那行,那这里面CTRL加P,那这里面需要传入的参数是purpose或者集合,那好,那这里面呢,很显然我们是需要一个purpose。需要啥你就用啥。来。拿到它之后,第一时间把这个参数给过去。然后呢,点包走。
03:01
哎,卡吧,肯西这样就来了,来了之后下面我们需要配置一下这里面的参数,那哪些参数是必须配置的呢?思考一下,第一个你是不得连接上对应的这个卡瓦集群呢?连接上之后还有一个叫反序列化。还记得我们之前在生产者的时候,有序的话,那这边必然得有反序的化,好,那我们先写一下啊第一个链接。呃,点put put的话呢,那这个呢,就是肯修膜。跟这个第二。点什么呢?诶这里面第一个呢,就是连接上集群boot server哈,好拿过来,那你确认一下是不是CTRL,按住CTRL键点进去。点进去。是不是就是它呀,哎,就是这个boowap server哈,拿过来。那它的话呢,我们可以写上电子参数。Hiop 1029092,如果想再连一个的话,你的生长环境当中,那么就1039092,保证它对应的可靠性,这就OK了。
04:03
这是连接上,那下面呢,进行反序列化,点put。仍然是肯肯点key,你看这里面呢,就是key的一个反序列化类知道吗?嗯,好拿过来。那我们这用的就是string。这时候注意了,你看这里面有个序列化和反序列化,现在呢,应该调用的是哎,反序列化啊反序列化,然后点class.get name。诶,这样就OK了。那再来一个八点不等。看正法肯这个点Y流,那Y流调用的仍然是这个反序列化类。好。是这样。点class.get name别写错了就可以了,那现在呢,这些必备项我就完事了,那下面干嘛呢?来开始写这个订阅主题,那订阅主题。它点点什么呢。哎。那这个呢,就是订阅,那订阅你看这里面传入的参数是不是一个集合呀,那很显然我是不是可以同时间消费多个主题的数据啊。
05:07
比如说你这里面有一个for的主题,那我还可以来一个second,那我都能够由这一个消费者来进行一个消费啊,要注意。那我们就定义一个啊,那需要一个集合,需要啥就来啥,又一个。啊。是类型。两包这个呢叫topic,存储多个topic,那topic里面得添加数据,那就是艾,那我把这个first主题添加进去。这就OK了。拿过来。现在呢,就订阅了一个主题。这是订阅啊。好订阅主题,那接下来最后一步就是消费数据,那消费数据呢,就是这个外循环。然后住。就说一直使循环消费。那么这里面要想退出这个while处这种循环一般怎么办啊,在生长环境当中啊,你想退出的话就是if,比如说这一个这个flag标志位flag啊,如果说它等于等于啊处。
06:08
那怎么办?哎,我这里面就可以这个break。啊,不意退出啊,然后正常情况下呢,他们一直直都是false,那这个flag标记呢,你可以在其他这个线程里面啊,来控制这个flag的值啊,那就可以了啊正常这样那行,那下面呢,我们就开始一直消费,那消费的话,我这块应该怎么写呢?嗯,拿到这个卡法consumer点。我要想消费出去,其实是一个拉的动作,比如说。那拉的话里面传入了一个时间,那就是你每批次与批次之间的一个间隔时间,我说第一批次拉完之后,我第二批什么时候拉,哎,那这个间隔时间,那间隔时间间隔多少呢?啊,Reason time,那我这是秒吧,啊一秒那就是一秒啊拉取一次啊点忘返回。那这一秒呢,我就拉回来consumer record这么多数据。那拉回来这波数据我想看一看。
07:02
怎么办?哎,那就打印到控制台呗,那对它进行一个循环,点for啊方循环便利。那便利的话,System out,然后将它放进去,那这样呢,就是循环打印你消费到的数据,哎,这就OK了。那这个行不行呢?撤一下呗,是不是,哎,我们先把这个运行一下,看行不行,那你回忆一下之前我讲这个的时候的注意事项。比如说消费者代码的注意事项。很显然,眉形所有异常。看一下这款。说有must provide一个固定的什么group ID,这个值必须得有啊,一定要注意啊,这个参数必须得配,那我们就给它配一下啊。在这里面。配置消费者。主ID你看我现在呢,是一个消费者也得进行一个配置。The。
08:01
啃熊猫,啃这点点什么呢?Group ID啊,就是他那这里面我给他一个这个名字是我们用户随便起的啊,比如说就是test。那这样呢,我就会创建一个test的一个消费者主。那行有了它之后呢,我再次执行。你看现在这个消费者哎,就已经运行起来了,那运行起来他没有消费到数据啊,因为呢,你生产者还没有给他发这样吧,好,那下面呢,我们在命令行当中,哎,向这个first主题发送一波数据哈。那来到这里面我们发送数据啊B卡不卡。很臭。Producer杠杠连接上boot。Stop。Server。嗨,杜跑1029092。尴尬。他哥。First向这个主题里面发送数据,那比如说我发送一条啊好了。
09:03
哎,你看这边呢,就能够收到一批数据。在哪呢?是Y6 hello,啊,那就可以了,那当然了,除了这个通过民航发送,我们也可以通过我们之前写的生产者代码进行一个发送。打一打。这行。这面发送完毕,我们来看一下这边有没有收到数据,是不是也收到对应的数据了,哎,对应的是艾特硅谷零,一直到艾特硅五四啊。再就是呢,通过API的方式来去消费某一个主题的数据哈。
我来说两句