00:00
接下来呢,我们来看一下第三章消费者源码。那么还是老规矩,在讲消费者源码之前,我们快速的回忆一下消费者他干了哪些事儿啊?双击打开。首先呢,我们是生产者,把数据啊发送到卡法集群,接下来消费者组闪亮登场,好,那么消费者组登场之后,首先做的第一件事就是选择对应的这个coortinator。那么这个宽由谁决定呢?由这个消费者主ID的哈希扣的值,哎,对这个50群模,那50是谁呢?50是对应的这个系统的消费者up赛的这个主题,它的分居数,那默认呢,都是50好。那么接下来就是这个哈希勾的值啊,假如说是一,那一对这个50求模的话,那就等于一,那么我们接下来就来看一下这个一号分区,它在哪一个节点上,在哪个节点呢?诶,巧了,正好在这个博科一这个节点上。那在这个节点上,那我们后续就会选择这个节点上的coordinator,跟我们其他的这个消费者进行一个交互通讯,哎,这样来选择啊,那选择完之后,接下来说每一个这个消费者,诶就要发送这个噪音group,诶跟这个狂进行一个通讯。
01:13
那通讯汇报情况之后,那下面这个coordinator就会从这里面选择出来一个消费者,作为这个消费者leader。那选择出来之后,这个CD把所有的消息诶发送给这个消费者leader,那由他呢来制定对应的这个消费计划,那好,那他再把这个消费计划呢,回馈给对应的这个coordinator,由这个coordinator把这个方案分发给所有的消费者。那所有的消费者呀,就按照对应的这个消费方案进行一个消费啊好。那在这个过程当中涉及到了几个参数,第一个呢,就是每个消费者啊,都会跟这个跨店内头进行一个心跳同步,那默认的这个心跳时间呢是三秒,那好,那还有一个呢,就是一旦超过45秒,那就认为这个消费者啊跟整个集群脱离了这个呃,消费关系就会把它退出,那退出之后他的任务呢,就会分配给其他消费者来进行一个啊完成,那这个过程呢,就叫这个触发再平衡。
02:17
那还有一种条件处罚再平衡,那就是从这个消费者啊,从这边拉取过来一波数据之后,那处理的时间超过了五分钟,那么他也会退出,诶他把这个任务呢,再分配给哎,别人来进行一个完成,那这样呢,呃,就会也是触发再平衡啊。那行,那这个呢是消费者主初始化啊过程啊,那下面往下看,那初始化完毕之后,那接下来呢,我们就来看一下这个消费者主啊,这个具体的一个工作流程啊。首先打开,那左侧呢,这边是对应的这个服端,右侧呢是对应的这个客户端,也是消费者主,好那么消费者主啊,要想跟这个服务端通讯,那他必须得有一个客户端对象,那这个客户端对象呢,就是这个consumer network。
03:03
那创建它之后,那它这里面就会初始化这么多参数,那比如说第一个参数就是每批次最小抓取的数据大小,哎,默认呢是一个直径,比如说最少我可以抓一个直径。那么我最多能抓多少呢?最多是这个,最多我可以抓取50兆的数据,来同志们来拉取,那还有一个那一批数据最小值未达到的超时时间啊。什么意思呢?比如说这里面啊,我现在呢,不改成不是一个字节了,我改成十个字节,那十个字节这边有可能是五个字。对吧,啊,一共就五个字节的数据,那我这样来拉取,那你说哎十个字没到啊,那没到怎么办,一直等着吗?哎,不是,这里面还有一个超时时间,比如说你虽然说是五个时节,但是呢,哎,我等到这个500毫秒之后,那我仍然能把你这五个字节的数据啊拉过来啊是这样的一个配合参数。那下面呢,调用这个S方法,哎,发送,发送之后通过回调函数on success,呃,那如果成功的话,把数据呢,就拉取过来,那拉取过来之后它会放到哪里呢?一个叫complete fe,哎,这是一个队列,或者说是缓存,那每批次的往里面一批批的放,放完之后,那接下来他又会调用这个fech的record这个方法,诶来把这里的数据呢,哎,再进行重新的一个整合加工,我从这里面每批次呢,诶拉取的是500条数据啊,最多啊,最多是500条,那当然你要说一条的话,我也能够啊拉过来哈,直到拉美为止啊。
04:33
那500条数据拉过来之后呢,就要经过反序列化进行处理,然后拦截器,然后呢是哎,这个最终的用户啊,来消费对应的这个数据哈,再一个呢,就是整个这个消费流程啊,这之前咱们讲过了啊,所以说这块呢,讲的稍微要快一些啊,如果呃觉得快的同学呢,可以看一下啊以前的这个课程内容啊。
我来说两句