00:00
好,三零号我们是把消费者对应的初始化相关操作已经做完了,也就说这块我们又来一个卡法consumer,那接下来呢,就用这个consumer膜来干什么事呢?进行一个主题订阅。来主题订阅,那相当于我们之前写的代码的什么位置呢?是这个位置,比如说订阅你要消费哪几个主题啊。那回到这,那我们就进来看一看,这个订阅主题里面他干了什么事啊,那首先传进来的是一个topic对吧?哎,放到这个集合里面去,然后订阅。接下来继续往里面点。点进来之后,哎,看到这个位置。你刚才说要订阅的这个主题啊,如果说它会空。说要订阅的主题。如果为空那怎么办?那直接就抛异常了。直接跑一场。
01:00
那再往下看。哎,继续判断。判断什么呢说哎,我要定于这个主题啊,如果说是空的。哎,这个空啊。哎,你说里面没有数据,那么怎么办?诶,那么这里面告诉他说,哎没有进行一个注册,那否则的话才进行正常处理,你说这里面才是正常的处理操作。啊,那它正常怎么办呢?它是循环便利所有你订阅的这个topic思啊进行判断还是判断什么,仍然判断它是否为空,如果为空仍然抛一场。哎,也就是说他对这个呃,Topic克斯啊,为空这事呢,啊教验呢,是非常非常严格的哈。啊,那下边往下走,那往走到哪呢?走到这个位置。这行代码核心代码。
02:01
是进行订阅主题。说if这个如果返回为true啊,True的话呢,那这个原数据呢,就会更新对应的这个新的topic,也就是说更新你定义的新的主题信息,那什么意思呢?其实在这个过程当中,它是判断。你是否需要更新订阅的主题?来这样操作,同时呢,这里面还注册了一个监听器。监听器,那这个监听器是干什么用的,这个监听器啊,哎,主要来监听负载均衡的,你看啊,回忆一下,我们这是对应的消费者主,那消费者组当中如果有一个消费者挂掉了。那挂掉了是不是得通知对应的其他消费者啊,那怎么通知啊,哎,那就通过这个监听器,我监控其他节点是否OK,哎,如果不OK的话,那我就接收,哎,再给我分配的新的任务,比如说额外再增加的任务呢,那我也得进行一个能够处理啊,比如这呢有一个监听器啊。
03:08
主要用来负载均衡的。那我们进去看一看啊,进来进来之后首先第一个就上。注册一个reb。Listener。注册。负债。均衡监听器。哎,干这个事,这一个呃,注册完之后,那就等待,如果说啊有topic消费信息相关的变化的话,那这块呢,就能够监听得到啊,进行修改,那下面呢,这块呢是叫auto topics,那这块呢是按照主题进行一个自动的订阅啊。按照主题。自动订阅模式,哎,自动进行一个分配,那下面这块也就是说判断你是否需要更改订阅的主题判断。
04:00
是否需要更改?订阅的主题,哎,也就这块,那看看呗,进去把你这个topic克斯传进来,传进来之后就是它,那么如果你传进来的topic克斯跟你以前订阅的这个topic相同,那这块呢,我就不需要改变。是吧,这块。如果传入的topics。和以前。订阅的主题一致。那就不需要更改对应的这个topic主题哈。那就不需要。更改。对应。订阅的主题,哎,比如说这个意思啊,那这块一般情况下呢啊,如果说没有发生这种re balance负载再平衡的话,那这块呢,不会起作用哈,那请往后退。再往回推,推到这,那如果说这块诶发生了这个啊,Re balancell,也就是说再进行操作,那么下边呢,这块呢,就是更新对应的这个订阅请求。
05:06
一会儿呢。更新。订阅信息,哎,这就OK了啊,这样,那这就是整个这个订阅操作。
我来说两句