前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 源码分析 —— Message 拉取与消费(下)

RocketMQ 源码分析 —— Message 拉取与消费(下)

作者头像
芋道源码
发布2020-05-07 11:24:38
1.6K0
发布2020-05-07 11:24:38
举报
文章被收录于专栏:芋道源码1024芋道源码1024

1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:
    • 在大多数场景下使用。
    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
  • PullConsumer

本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

PushConsumer手绘图.png

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断不断不断Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStoreConsumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 NamesrvBroker 的 API调用,提供给 ProducerConsumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

代码语言:javascript
复制
  1: public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 创建订阅数据
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 通过心跳同步Consumer信息到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }
  • 说明 :订阅 Topic
  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

代码语言:javascript
复制
  1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
  2:     String subString) throws Exception {
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 处理订阅表达式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\\|\\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25: 
 26:     return subscriptionData;
 27: }
  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

代码语言:javascript
复制
  1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }
  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

RebalanceService&PushConsumer分配队列

RebalanceService

代码语言:javascript
复制
  1: public class RebalanceService extends ServiceThread {
  2: 
  3:     /**
  4:      * 等待间隔,单位:毫秒
  5:      */
  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval", "20000"));
  9: 
 10:     private final Logger log = ClientLogger.getLog();
 11:     /**
 12:      * MQClient对象
 13:      */
 14:     private final MQClientInstance mqClientFactory;
 15: 
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19: 
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23: 
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28: 
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31: 
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }
  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。
  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发:
    • 第 25 行 等待超时,每 20s 调用一次。
    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。
    • Broker 通知 Consumer 加入 或 移除时,Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

详细解析见:MQClientInstance#doRebalance(...)。

MQClientInstance#doRebalance(...)

代码语言:javascript
复制
  1: public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }
  • 说明 :遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
  • 疑问:目前代码调试下来,consumerTable 只包含 Consumer 自己。?有大大对这个疑问有解答的,烦请解答下。
  • 第 6 行 :调用 MQConsumerInner#doRebalance(...) 进行队列分配。DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...) 详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。

DefaultMQPushConsumerImpl#doRebalance(...)

代码语言:javascript
复制
  1: public void doRebalance() {
  2:     if (!this.pause) {
  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  4:     }
  5: }
  • 说明:执行消息队列分配。
  • 第 3 行 :调用 RebalanceImpl#doRebalance(...) 进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。

RebalanceImpl#doRebalance(...)

代码语言:javascript
复制
  1: /**
  2:  * 执行分配消息队列
  3:  *
  4:  * @param isOrder 是否顺序消息
  5:  */
  6: public void doRebalance(final boolean isOrder) {
  7:     // 分配每个 topic 的消息队列
  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  9:     if (subTable != null) {
 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 11:             final String topic = entry.getKey();
 12:             try {
 13:                 this.rebalanceByTopic(topic, isOrder);
 14:             } catch (Throwable e) {
 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 16:                     log.warn("rebalanceByTopic Exception", e);
 17:                 }
 18:             }
 19:         }
 20:     }
 21:     // 移除未订阅的topic对应的消息队列
 22:     this.truncateMessageQueueNotMyTopic();
 23: }
 24: 
 25: /**
 26:  * 移除未订阅的消息队列
 27:  */
 28: private void truncateMessageQueueNotMyTopic() {
 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
 31:         if (!subTable.containsKey(mq.getTopic())) {
 32: 
 33:             ProcessQueue pq = this.processQueueTable.remove(mq);
 34:             if (pq != null) {
 35:                 pq.setDropped(true);
 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
 37:             }
 38:         }
 39:     }
 40: }
  • #doRebalance(...) 说明 :执行分配消息队列。
    • 第 7 至 20 行 :循环订阅主题集合( subscriptionInner ),分配每一个 Topic 的消息队列。
    • 第 22 行 :移除未订阅的 Topic 的消息队列。
  • #truncateMessageQueueNotMyTopic(...) 说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。

RebalanceImpl#rebalanceByTopic(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • #rebalanceByTopic(...) 说明 :分配 Topic 的消息队列。
    • 第 21 至 40 行 :获取 Topic 对应的消息队列和消费者们,并对其进行排序。因为各 Consumer 是在本地分配消息队列,排序后才能保证各 Consumer 顺序一致。
    • 第 42 至 61 行 :根据 队列分配策略( AllocateMessageQueueStrategy ) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。
    • 第 63 至 72 行 :更新 Topic 对应的消息队列。
    • 第 3 至 19 行 :广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列。
    • 第 20 至 74 行 :集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列。
  • #updateProcessQueueTableInRebalance(...) 说明 :当分配队列时,更新 Topic 对应的消息队列,并返回是否有变更。
    • 第 132 至 135 行 :顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 137 行 :移除消息队列的消费进度。
    • 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
    • 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求
    • 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
    • 第 108 至 120 行 :队列拉取超时,即 当前时间 - 最后一次拉取消息时间 > 120s ( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面**#新增队列逻辑#**可以重新加入新的该消息队列。
    • 第 93 至 126 行 :移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )。
    • 第 128 至 158 行 :增加 分配的消息队列( mqSet ) 新增的消息队列。
    • 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。

RebalanceImpl#removeUnnecessaryMessageQueue(...)

RebalancePushImpl#removeUnnecessaryMessageQueue(...)
代码语言:javascript
复制
  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     // 同步队列的消费进度,并移除之。
  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
  5:     // TODO 顺序消费
  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
  8:         try {
  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 10:                 try {
 11:                     return this.unlockDelay(mq, pq);
 12:                 } finally {
 13:                     pq.getLockConsume().unlock();
 14:                 }
 15:             } else {
 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 17:                     mq, //
 18:                     pq.getTryUnlockTimes());
 19: 
 20:                 pq.incTryUnlockTimes();
 21:             }
 22:         } catch (Exception e) {
 23:             log.error("removeUnnecessaryMessageQueue Exception", e);
 24:         }
 25: 
 26:         return false;
 27:     }
 28:     return true;
 29: }
  • 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
  • 第 2 至 4 行 :同步队列的消费进度,并移除之。
  • 第 5 至 27 行 :顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)
代码语言:javascript
复制
  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
  4:     return true;
  5: }
  • 说明 :移除不需要的消息队列相关的信息,并返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(...)

代码语言:javascript
复制
  1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
  2:     for (PullRequest pullRequest : pullRequestList) {
  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
  5:     }
  6: }
  • 说明 :发起消息拉取请求。该调用是PushConsumer不断不断不断拉取消息的起点
DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
代码语言:javascript
复制
  1: public void executePullRequestImmediately(final PullRequest pullRequest) {
  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
  3: }
  • 说明 :提交拉取请求。提交后,PullMessageService 异步执行非阻塞。详细解析见:PullMessageService。

AllocateMessageQueueStrategy

AllocateMessageQueueStrategy类图

AllocateMessageQueueAveragely
代码语言:javascript
复制
  1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
  2:     private final Logger log = ClientLogger.getLog();
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  6:         List<String> cidAll) {
  7:         // 校验参数是否正确
  8:         if (currentCID == null || currentCID.length() < 1) {
  9:             throw new IllegalArgumentException("currentCID is empty");
 10:         }
 11:         if (mqAll == null || mqAll.isEmpty()) {
 12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 13:         }
 14:         if (cidAll == null || cidAll.isEmpty()) {
 15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
 16:         }
 17: 
 18:         List<MessageQueue> result = new ArrayList<>();
 19:         if (!cidAll.contains(currentCID)) {
 20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
 21:                 consumerGroup,
 22:                 currentCID,
 23:                 cidAll);
 24:             return result;
 25:         }
 26:         // 平均分配
 27:         int index = cidAll.indexOf(currentCID); // 第几个consumer。
 28:         int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
 29:         int averageSize =
 30:             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
 31:                 + 1 : mqAll.size() / cidAll.size());
 32:         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
 33:         int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。
 34:         for (int i = 0; i < range; i++) {
 35:             result.add(mqAll.get((startIndex + i) % mqAll.size()));
 36:         }
 37:         return result;
 38:     }
 39: 
 40:     @Override
 41:     public String getName() {
 42:         return "AVG";
 43:     }
 44: }
  • 说明 :平均分配队列策略。
  • 第 7 至 25 行 :参数校验。
  • 第 26 至 36 行 :平均分配消息队列。
    • [ 0, mod )mqAll.size() / cidAll.size() + 1。前面 modConsumer 平分余数,多获得 1 个消息队列。
    • [ mod, cidAll.size() )mqAll.size() / cidAll.size()
    • 第 27 行 :index :当前 Consumer 在消费集群里是第几个。这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因。如果不排序,Consumer 在本地计算出来的 index 无法一致,影响计算结果。
    • 第 28 行 :mod :余数,即多少消息队列无法平均分配。
    • 第 29 至 31 行 :averageSize :代码可以简化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
    • 第 32 行 :startIndexConsumer 分配消息队列开始位置。
    • 第 33 行 :range :分配队列数量。之所以要 Math#min(...) 的原因:当 mqAll.size() <= cidAll.size() 时,最后几个 Consumer 分配不到消息队列。
    • 第 34 至 36 行 :生成分配消息队列结果。
  • 举个例子:

固定消息队列长度为4

Consumer * 2 可以整除

Consumer * 3 不可整除

Consumer * 5 无法都分配

消息队列[0]

Consumer[0]

Consumer[0]

Consumer[0]

消息队列[1]

Consumer[0]

Consumer[0]

Consumer[1]

消息队列[2]

Consumer[1]

Consumer[1]

Consumer[2]

消息队列[3]

Consumer[1]

Consumer[2]

Consumer[3]

AllocateMessageQueueByMachineRoom
代码语言:javascript
复制
  1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
  2:     /**
  3:      * 消费者消费brokerName集合
  4:      */
  5:     private Set<String> consumeridcs;
  6: 
  7:     @Override
  8:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  9:         List<String> cidAll) {
 10:         // 参数校验
 11:         List<MessageQueue> result = new ArrayList<MessageQueue>();
 12:         int currentIndex = cidAll.indexOf(currentCID);
 13:         if (currentIndex < 0) {
 14:             return result;
 15:         }
 16:         // 计算符合当前配置的消费者数组('consumeridcs')对应的消息队列
 17:         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
 18:         for (MessageQueue mq : mqAll) {
 19:             String[] temp = mq.getBrokerName().split("@");
 20:             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
 21:                 premqAll.add(mq);
 22:             }
 23:         }
 24:         // 平均分配
 25:         int mod = premqAll.size() / cidAll.size();
 26:         int rem = premqAll.size() % cidAll.size();
 27:         int startIndex = mod * currentIndex;
 28:         int endIndex = startIndex + mod;
 29:         for (int i = startIndex; i < endIndex; i++) {
 30:             result.add(mqAll.get(i));
 31:         }
 32:         if (rem > currentIndex) {
 33:             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
 34:         }
 35:         return result;
 36:     }
 37: 
 38:     @Override
 39:     public String getName() {
 40:         return "MACHINE_ROOM";
 41:     }
 42: 
 43:     public Set<String> getConsumeridcs() {
 44:         return consumeridcs;
 45:     }
 46: 
 47:     public void setConsumeridcs(Set<String> consumeridcs) {
 48:         this.consumeridcs = consumeridcs;
 49:     }
 50: }
  • 说明 :平均分配可消费的 Broker 对应的消息队列。
  • 第 7 至 15 行 :参数校验。
  • 第 16 至 23 行 :计算可消费的 Broker 对应的消息队列。
  • 第 25 至 34 行 :平均分配消息队列。该平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是将多余的结尾部分分配给前 remConsumer
  • 疑问:使用该分配策略时,ConsumerBroker 分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。
AllocateMessageQueueAveragelyByCircle
代码语言:javascript
复制
  1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
 2:     private final Logger log = ClientLogger.getLog();
 3: 
 4:     @Override
 5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 6:         List<String> cidAll) {
 7:         // 校验参数是否正确
 8:         if (currentCID == null || currentCID.length() < 1) {
 9:             throw new IllegalArgumentException("currentCID is empty");
10:         }
11:         if (mqAll == null || mqAll.isEmpty()) {
12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
13:         }
14:         if (cidAll == null || cidAll.isEmpty()) {
15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
16:         }
17: 
18:         List<MessageQueue> result = new ArrayList<MessageQueue>();
19:         if (!cidAll.contains(currentCID)) {
20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21:                 consumerGroup,
22:                 currentCID,
23:                 cidAll);
24:             return result;
25:         }
26: 
27:         // 环状分配
28:         int index = cidAll.indexOf(currentCID);
29:         for (int i = index; i < mqAll.size(); i++) {
30:             if (i % cidAll.size() == index) {
31:                 result.add(mqAll.get(i));
32:             }
33:         }
34:         return result;
35:     }
36: 
37:     @Override
38:     public String getName() {
39:         return "AVG_BY_CIRCLE";
40:     }
41: }
  • 说明 :环状分配消息队列。
AllocateMessageQueueByConfig
代码语言:javascript
复制
  1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
  2:     private List<MessageQueue> messageQueueList;
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  6:         List<String> cidAll) {
  7:         return this.messageQueueList;
  8:     }
  9: 
 10:     @Override
 11:     public String getName() {
 12:         return "CONFIG";
 13:     }
 14: 
 15:     public List<MessageQueue> getMessageQueueList() {
 16:         return messageQueueList;
 17:     }
 18: 
 19:     public void setMessageQueueList(List<MessageQueue> messageQueueList) {
 20:         this.messageQueueList = messageQueueList;
 21:     }
 22: }
  • 说明 :分配配置的消息队列。
  • 疑问 :该分配策略的使用场景。

5、PushConsumer 消费进度读取

RebalancePushImpl#computePullFromWhere(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :计算消息队列开始消费位置。
  • PushConsumer 读取消费进度有三种选项:
    • CONSUME_FROM_LAST_OFFSET :第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_FIRST_OFFSET :第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_TIMESTAMP :第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费

[PullConsumer] RebalancePullImpl#computePullFromWhere(...)

暂时跳过。?

6、PushConsumer 拉取消息

DefaultMQPushConsumerImpl拉取消息

PullMessageService

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • #executePullRequestLater(...) :第 26 至 40 行 :提交延迟拉取消息请求。
  • #executePullRequestImmediately(...) :第 42 至 53 行 :提交立即拉取消息请求。
  • #executeTaskLater(...) :第 55 至 63 行 :提交延迟任务
  • #pullMessage(...) :第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。
  • #run(...) :第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue ),进行消息拉取。

DefaultMQPushConsumerImpl#pullMessage(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • #pullMessage(...) 说明 :拉取消息。
    • 执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
    • 当发起请求产生异常时,提交延迟拉取消息请求。对应 Broker 处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。
    • 第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
    • 第 9 行 :设置消息处理队列最后拉取消息时间。
    • 第 11 至 18 行 :Consumer 未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。
    • 第 20 至 25 行 :Consumer 处于暂停中,不进行消息拉取,提交延迟拉取消息请求。
    • 第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
    • 第 39 至 49 行 :Consumer并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。
    • 第 50 至 70 行 :顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 72 至 78 行 :Topic 对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。
    • 第 222 至 224 行 :判断请求是否使用 Consumer 本地的订阅信息( SubscriptionData ),而不使用 Broker 里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。
    • 第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
    • 第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
    • 第 237 至 255 行 :
  • PullCallback :拉取消息回调:
    • 第 90 至 139 行 :拉取到消息( FOUND ) :
    • 第 140 至 149 行 :没有新消息( NO_NEW_MSG ) :
    • 第 150 至 159 行 :有新消息但是不匹配( NO_MATCHED_MSG )。逻辑同 NO_NEW_MSG
    • 第 160 至 189 行 :拉取请求的消息队列位置不合法 (OFFSET_ILLEGAL)。
    • 第 91 至 93 行 :设置下次拉取消息队列位置。
    • 第 95 至 97 行 :统计。
    • 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。
    • 第 106 至 108 行 :统计。
    • 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。
    • 第 113 至 118 行 :提交消费请求到 ConsumeMessageService。详细解析见:ConsumeMessageConcurrentlyService。
    • 第 120 至 126 行 :根据拉取频率( pullInterval ),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。
    • 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。
    • 第 142 行 :设置下次拉取消息队列位置。
    • 第 145 行 :更正消费进度。详细解析见:#correctTagsOffset(...)
    • 第 148 行 :提交立即拉取消息请求。
    • 第 175 至 178 行 :更新消费进度,同步消费进度到 Broker
    • 第 181 行 :移除消费处理队列。
    • 疑问:为什么不立即移除???
    • 第 164 行 :设置下次拉取消息队列位置。
    • 第 167 行 :设置消息处理队列为 dropped
    • 第 169 至 188 行 :提交延迟任务,进行队列移除。
    • 第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
    • 第 89 至 192 行 :处理拉取状态结果:
    • 第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
  • #correctTagsOffset(...) :更正消费进度。
    • 第 258 至 261 行 :当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。

PullAPIWrapper#pullKernelImpl(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明?。
  • 第 34 至 43 行 :获取 Broker 信息(Broker 地址、是否为从节点)。
    • #recalculatePullFromWhichNode(...)
    • #MQClientInstance#findBrokerAddressInSubscribe(...)
  • 第 45 至 78 行 :请求拉取消息
  • 第 81 行 :当 Broker 信息不存在,则抛出异常。
PullAPIWrapper#recalculatePullFromWhichNode(...)
代码语言:javascript
复制
  1: /**
  2:  * 消息队列 与 拉取Broker 的映射
  3:  * 当拉取消息时,会通过该映射获取拉取请求对应的Broker
  4:  */
  5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
  6:     new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
  7: /**
  8:  * 是否使用默认Broker
  9:  */
 10: private volatile boolean connectBrokerByUser = false;
 11: /**
 12:  * 默认Broker编号
 13:  */
 14: private volatile long defaultBrokerId = MixAll.MASTER_ID;
 15: 
 16: /**
 17:  * 计算消息队列拉取消息对应的Broker编号
 18:  *
 19:  * @param mq 消息队列
 20:  * @return Broker编号
 21:  */
 22: public long recalculatePullFromWhichNode(final MessageQueue mq) {
 23:     // 若开启默认Broker开关,则返回默认Broker编号
 24:     if (this.isConnectBrokerByUser()) {
 25:         return this.defaultBrokerId;
 26:     }
 27: 
 28:     // 若消息队列映射拉取Broker存在,则返回映射Broker编号
 29:     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
 30:     if (suggest != null) {
 31:         return suggest.get();
 32:     }
 33: 
 34:     // 返回Broker主节点编号
 35:     return MixAll.MASTER_ID;
 36: }
  • 说明 :计算消息队列拉取消息对应的 Broker 编号。
MQClientInstance#findBrokerAddressInSubscribe(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :获取 Broker 信息(Broker 地址、是否为从节点)。

PullAPIWrapper#processPullResult(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :处理拉取结果。
    • 更新消息队列拉取消息 Broker 编号的映射。
    • 解析消息,并根据订阅信息消息 tagCode匹配合适消息。
  • 第 16 行 :更新消息队列拉取消息 Broker 编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker 编号,会使用更新后的 Broker 编号。
  • 第 18 至 55 行 :解析消息,并根据订阅信息消息 tagCode 匹配合适消息。
    • 第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
    • 第 24 至 35 行 :根据订阅信息tagCode 匹配消息。
    • 第 37 至 43 行 :Hook
    • 第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
    • 第 54 行 :设置消息队列。
  • 第 58 行 :清空消息二进制数组。

ProcessQueue#putMessage(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读

总结

如果用最简单粗暴的方式描述 PullConsumer 拉取消息的过程,那就是如下的代码:

代码语言:javascript
复制
while (true) {
    if (不满足拉取消息) {
        Thread.sleep(间隔);
        continue;
    }
    主动拉取消息();
}

6、PushConsumer 消费消息

DefaultMQPushConsumerImpl消费消息

ConsumeMessageConcurrentlyService 提交消费请求

ConsumeMessageConcurrentlyService#submitConsumeRequest(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :提交立即消费请求。
  • 第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
  • 第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
    • 第 25 至 33 行 :计算当前拆分请求包含的消息。
    • 第 35 至 38 行 :提交拆分消费请求。
    • 第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :提交延迟消费请求。
  • 第 34 行 :直接调用 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。如果消息数超过批量消费上限,会不会是BUG

ConsumeRequest

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :消费请求。提交请求执行消费。
  • 第 24 至 28 行 :废弃处理队列不进行消费。
  • 第 34 至 44 行 :Hook。
  • 第 51 行 :当消息为重试消息,设置 Topic为原始 Topic。例如:原始 TopicTopicTest,重试时 Topic%RETRY%please_rename_unique_group_name_4,经过该方法,Topic 设置回 TopicTest
  • 第 53 至 58 行 :设置开始消费时间。
  • 第 61 行 :进行消费
  • 第 71 至 85 行 :解析消费返回结果类型
  • 第 87 至 90 行 :Hook
  • 第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
  • 第 101 至 106 行 :Hook
  • 第 108 至 110 行 :统计。
  • 第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。

ConsumeMessageConcurrentlyService#processConsumeResult(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :处理消费结果。
  • 第 8 至 10 行 :消费请求消息未空时,直接返回。
  • 第 12 至 32 行 :计算 ackIndex 值。consumeRequest.msgs[0 - ackIndex]为消费成功,需要进行 ack 确认。
    • 第 14 至 23 行 :CONSUME_SUCCESSackIndex = context.getAckIndex()
    • 第 24 至 29 行 :RECONSUME_LATERackIndex = -1
  • 第34 至 63 行 :处理消费失败的消息。
    • 第 43 至 52 行 :发回消费失败的消息到 Broker。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。
    • 第 54 至 59 行 :发回 Broker 失败的消息,直接提交延迟重新消费。
    • 如果发回 Broker 成功,结果因为例如网络异常,导致 Consumer以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。
    • 第 36 至 41 行 :BROADCASTING :广播模式,无论是否消费失败,不发回消息到 Broker,只打印日志。
    • 第 42 至 60 行 :CLUSTERING :集群模式,消费失败的消息发回到 Broker
  • 第 65 至 69 行 :移除**【消费成功】【消费失败但发回Broker成功】**的消息,并更新最新消费进度。
    • 为什么会有**【消费失败但发回Broker成功】的消息?见第 56 行**。
    • ProcessQueue#removeMessage(...)

ProcessQueue#removeMessage(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读

ConsumeMessageConcurrentlyService#cleanExpireMsg(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :定时清理过期消息,默认周期:15min。

ProcessQueue#cleanExpiredMsg(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :移除过期消息。
  • 第 2 至 5 行 :顺序消费时,直接返回。
  • 第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
  • 第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
  • 第 29 行 :发回超时消息到Broker
  • 第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。

7、PushConsumer 发回消费失败消息

DefaultMQPushConsumerImpl#sendMessageBack(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :发回消息。
  • 第 4 至 8 行 :Consumer 发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。
  • 第 10 至 25 行 :发生异常时,Consumer 内置默认 Producer 发送消息。
    • ?疑问:什么样的情况下会发生异常呢?

MQClientAPIImpl#consumerSendMessageBack(...)

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读

8、Consumer 消费进度

OffsetStore

OffsetStore类图.png

  • RemoteBrokerOffsetStoreConsumer 集群模式 下,使用远程 Broker 消费进度。
  • LocalFileOffsetStoreConsumer 广播模式下,使用本地 文件 消费进度。

OffsetStore#load(...)

LocalFileOffsetStore#load(...)
代码语言:javascript
复制
  1: @Override
  2: public void load() throws MQClientException {
  3:     // 从本地硬盘读取消费进度
  4:     OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
  5:     if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
  6:         offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
  7: 
  8:         // 打印每个消息队列的消费进度
  9:         for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
 10:             AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
 11:             log.info("load consumer's offset, {} {} {}",
 12:                 this.groupName,
 13:                 mq,
 14:                 offset.get());
 15:         }
 16:     }
 17: }
  • 说明 :从本地文件加载消费进度到内存。
OffsetSerializeWrapper
代码语言:javascript
复制
  1: public class OffsetSerializeWrapper extends RemotingSerializable {
  2:     private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
  3:             new ConcurrentHashMap<>();
  4: 
  5:     public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
  6:         return offsetTable;
  7:     }
  8: 
  9:     public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
 10:         this.offsetTable = offsetTable;
 11:     }
 12: }
  • 说明 :本地 Offset 存储序列化。
代码语言:javascript
复制
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
 "offsetTable":{{
   "brokerName":"broker-a",
   "queueId":3,
   "topic":"TopicTest"
  }:1470,{
   "brokerName":"broker-a",
   "queueId":2,
   "topic":"TopicTest"
  }:1471,{
   "brokerName":"broker-a",
   "queueId":1,
   "topic":"TopicTest"
  }:1470,{
   "brokerName":"broker-a",
   "queueId":0,
   "topic":"TopicTest"
  }:1470
 }
}
RemoteBrokerOffsetStore#load(...)
代码语言:javascript
复制
  1: @Override
  2: public void load() {
  3: }
  • 说明 :不进行加载,实际读取消费进度时,从 Broker 获取。

OffsetStore#readOffset(...)

读取消费进度类型:

  • READ_FROM_MEMORY :从内存读取。
  • READ_FROM_STORE :从存储( Broker文件 )读取。
  • MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取。
LocalFileOffsetStore#readOffset(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 第 16 行 :从 文件 读取消费进度。
RemoteBrokerOffsetStore#readOffset(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 第 16 行 :从 Broker 读取消费进度。

OffsetStore#updateOffset(...)

该方法 RemoteBrokerOffsetStoreLocalFileOffsetStore 实现相同。

代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读

OffsetStore#persistAll(...)

LocalFileOffsetStore#persistAll(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :持久化消费进度。将消费进度写入文件
RemoteBrokerOffsetStore#persistAll(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。
MQClientInstance#persistAllConsumerOffset(...)
代码语言:javascript
复制
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • 说明 :定时进行持久化,默认周期:5000ms。
  • 重要说明 :
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

9、结尾

?可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。 感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。?真的有丢丢长。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、概述
  • 2、Consumer
  • 3、PushConsumer 一览
  • 4、PushConsumer 订阅
    • DefaultMQPushConsumerImpl#subscribe(...)
      • FilterAPI.buildSubscriptionData(...)
    • DefaultMQPushConsumer#registerMessageListener(...)
    • 5、PushConsumer 消息队列分配
      • RebalanceService
        • MQClientInstance#doRebalance(...)
          • DefaultMQPushConsumerImpl#doRebalance(...)
            • RebalanceImpl#doRebalance(...)
              • RebalanceImpl#rebalanceByTopic(...)
              • RebalanceImpl#removeUnnecessaryMessageQueue(...)
              • RebalancePushImpl#dispatchPullRequest(...)
              • AllocateMessageQueueStrategy
          • 5、PushConsumer 消费进度读取
            • RebalancePushImpl#computePullFromWhere(...)
              • [PullConsumer] RebalancePullImpl#computePullFromWhere(...)
              • 6、PushConsumer 拉取消息
                • PullMessageService
                  • DefaultMQPushConsumerImpl#pullMessage(...)
                    • PullAPIWrapper#pullKernelImpl(...)
                    • PullAPIWrapper#processPullResult(...)
                    • ProcessQueue#putMessage(...)
                  • 总结
                  • 6、PushConsumer 消费消息
                    • ConsumeMessageConcurrentlyService 提交消费请求
                      • ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
                      • ConsumeMessageConcurrentlyService#submitConsumeRequestLater
                    • ConsumeRequest
                      • ConsumeMessageConcurrentlyService#processConsumeResult(...)
                        • ProcessQueue#removeMessage(...)
                      • ConsumeMessageConcurrentlyService#cleanExpireMsg(...)
                        • ProcessQueue#cleanExpiredMsg(...)
                    • 7、PushConsumer 发回消费失败消息
                      • DefaultMQPushConsumerImpl#sendMessageBack(...)
                        • MQClientAPIImpl#consumerSendMessageBack(...)
                    • 8、Consumer 消费进度
                      • OffsetStore
                        • OffsetStore#load(...)
                        • OffsetStore#readOffset(...)
                        • OffsetStore#updateOffset(...)
                        • OffsetStore#persistAll(...)
                    • 9、结尾
                    相关产品与服务
                    消息队列 CMQ 版
                    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档