本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
MQ 提供了两类消费者:
Push
开头,实际在实现时,使用 Pull
方式实现。通过 Pull
不断不断不断轮询 Broker
获取消息。当不存在新消息时,Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker
主动 Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling
)。本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
PushConsumer手绘图.png
RebalanceService
:均衡消息队列服务,负责分配当前 Consumer
可消费的消息队列( MessageQueue
)。当有新的 Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从 Broker
拉取消息,并提交消费任务到 ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从 Broker
获取消费进度,同步消费进度到 Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对 Namesrv
,Broker
的 API调用,提供给 Producer
、Consumer
使用。 1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
Topic
。Consumer
信息到 Broker
。 1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
Topic
和 订阅表达式 创建订阅数据 1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
RebalanceService&PushConsumer分配队列
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
Consumer
可消费的消息队列( MessageQueue
)。MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:第 25 行
等待超时,每 20s 调用一次。PushConsumer
启动时,调用 rebalanceService#wakeup(...)
触发。Broker
通知 Consumer
加入 或 移除时,Consumer
响应通知,调用 rebalanceService#wakeup(...)
触发。详细解析见:MQClientInstance#doRebalance(...)。
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
Client
包含的 consumerTable
( Consumer
集合 ),执行消息队列分配。consumerTable
只包含 Consumer
自己。?有大大对这个疑问有解答的,烦请解答下。MQConsumerInner#doRebalance(...)
进行队列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。 1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }
RebalanceImpl#doRebalance(...)
进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。 1: /**
2: * 执行分配消息队列
3: *
4: * @param isOrder 是否顺序消息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每个 topic 的消息队列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未订阅的topic对应的消息队列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未订阅的消息队列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }
#doRebalance(...)
说明 :执行分配消息队列。subscriptionInner
),分配每一个 Topic
的消息队列。Topic
的消息队列。#truncateMessageQueueNotMyTopic(...)
说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合( subscriptionInner
),对应消息队列移除在该方法。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
#rebalanceByTopic(...)
说明 :分配 Topic
的消息队列。Topic
对应的消息队列和消费者们,并对其进行排序。因为各 Consumer
是在本地分配消息队列,排序后才能保证各 Consumer
顺序一致。AllocateMessageQueueStrategy
) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。Topic
对应的消息队列。BROADCASTING
) 下,分配 Topic
对应的所有消息队列。CLUSTERING
) 下,分配 Topic
对应的部分消息队列。#updateProcessQueueTableInRebalance(...)
说明 :当分配队列时,更新 Topic
对应的消息队列,并返回是否有变更。顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。当前时间 - 最后一次拉取消息时间 > 120s
( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面**#新增队列逻辑#**可以重新加入新的该消息队列。mqSet
) 的 消息处理队列( processQueueTable
)。mqSet
) 新增的消息队列。 1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步队列的消费进度,并移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 顺序消费
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...) 1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。 1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }
PushConsumer
不断不断不断拉取消息的起点。 1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }
PullMessageService
异步执行,非阻塞。详细解析见:PullMessageService。AllocateMessageQueueStrategy类图
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校验参数是否正确
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26: // 平均分配
27: int index = cidAll.indexOf(currentCID); // 第几个consumer。
28: int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
29: int averageSize =
30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
31: + 1 : mqAll.size() / cidAll.size());
32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。
34: for (int i = 0; i < range; i++) {
35: result.add(mqAll.get((startIndex + i) % mqAll.size()));
36: }
37: return result;
38: }
39:
40: @Override
41: public String getName() {
42: return "AVG";
43: }
44: }
[ 0, mod )
:mqAll.size() / cidAll.size() + 1
。前面 mod
个 Consumer
平分余数,多获得 1 个消息队列。[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
。index
:当前 Consumer
在消费集群里是第几个。这里就是为什么需要对传入的 cidAll
参数必须进行排序的原因。如果不排序,Consumer
在本地计算出来的 index
无法一致,影响计算结果。mod
:余数,即多少消息队列无法平均分配。averageSize
:代码可以简化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
。startIndex
:Consumer
分配消息队列开始位置。range
:分配队列数量。之所以要 Math#min(...)
的原因:当 mqAll.size() <= cidAll.size()
时,最后几个 Consumer
分配不到消息队列。固定消息队列长度为4。
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 | |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
2: /**
3: * 消费者消费brokerName集合
4: */
5: private Set<String> consumeridcs;
6:
7: @Override
8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
9: List<String> cidAll) {
10: // 参数校验
11: List<MessageQueue> result = new ArrayList<MessageQueue>();
12: int currentIndex = cidAll.indexOf(currentCID);
13: if (currentIndex < 0) {
14: return result;
15: }
16: // 计算符合当前配置的消费者数组('consumeridcs')对应的消息队列
17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
18: for (MessageQueue mq : mqAll) {
19: String[] temp = mq.getBrokerName().split("@");
20: if (temp.length == 2 && consumeridcs.contains(temp[0])) {
21: premqAll.add(mq);
22: }
23: }
24: // 平均分配
25: int mod = premqAll.size() / cidAll.size();
26: int rem = premqAll.size() % cidAll.size();
27: int startIndex = mod * currentIndex;
28: int endIndex = startIndex + mod;
29: for (int i = startIndex; i < endIndex; i++) {
30: result.add(mqAll.get(i));
31: }
32: if (rem > currentIndex) {
33: result.add(premqAll.get(currentIndex + mod * cidAll.size()));
34: }
35: return result;
36: }
37:
38: @Override
39: public String getName() {
40: return "MACHINE_ROOM";
41: }
42:
43: public Set<String> getConsumeridcs() {
44: return consumeridcs;
45: }
46:
47: public void setConsumeridcs(Set<String> consumeridcs) {
48: this.consumeridcs = consumeridcs;
49: }
50: }
Broker
对应的消息队列。Broker
对应的消息队列。AllocateMessageQueueAveragely
略有不同,其是将多余的结尾部分分配给前 rem
个 Consumer
。Consumer
和 Broker
分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。 1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校验参数是否正确
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<MessageQueue>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26:
27: // 环状分配
28: int index = cidAll.indexOf(currentCID);
29: for (int i = index; i < mqAll.size(); i++) {
30: if (i % cidAll.size() == index) {
31: result.add(mqAll.get(i));
32: }
33: }
34: return result;
35: }
36:
37: @Override
38: public String getName() {
39: return "AVG_BY_CIRCLE";
40: }
41: }
1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
2: private List<MessageQueue> messageQueueList;
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: return this.messageQueueList;
8: }
9:
10: @Override
11: public String getName() {
12: return "CONFIG";
13: }
14:
15: public List<MessageQueue> getMessageQueueList() {
16: return messageQueueList;
17: }
18:
19: public void setMessageQueueList(List<MessageQueue> messageQueueList) {
20: this.messageQueueList = messageQueueList;
21: }
22: }
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
PushConsumer
读取消费进度有三种选项:CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。[PullConsumer]
RebalancePullImpl#computePullFromWhere(...)暂时跳过。?
DefaultMQPushConsumerImpl拉取消息
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
拉取消息,并提交消费任务到 ConsumeMessageService
。#executePullRequestLater(...)
:第 26 至 40 行 :提交延迟拉取消息请求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交立即拉取消息请求。#executeTaskLater(...)
:第 55 至 63 行 :提交延迟任务。#pullMessage(...)
:第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。#run(...)
:第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue
),进行消息拉取。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
#pullMessage(...)
说明 :拉取消息。Broker
处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。Consumer
未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。Consumer
处于暂停中,不进行消息拉取,提交延迟拉取消息请求。Consumer
为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。Topic
对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。Consumer
本地的订阅信息( SubscriptionData
),而不使用 Broker
里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。PullCallback
:拉取消息回调:FOUND
) :NO_NEW_MSG
) :NO_MATCHED_MSG
)。逻辑同 NO_NEW_MSG
。OFFSET_ILLEGAL
)。ConsumeMessageService
。详细解析见:ConsumeMessageConcurrentlyService。pullInterval
),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。#correctTagsOffset(...)
。Broker
。dropped
。#correctTagsOffset(...)
:更正消费进度。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
信息(Broker
地址、是否为从节点)。Broker
信息不存在,则抛出异常。 1: /**
2: * 消息队列 与 拉取Broker 的映射
3: * 当拉取消息时,会通过该映射获取拉取请求对应的Broker
4: */
5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
6: new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
7: /**
8: * 是否使用默认Broker
9: */
10: private volatile boolean connectBrokerByUser = false;
11: /**
12: * 默认Broker编号
13: */
14: private volatile long defaultBrokerId = MixAll.MASTER_ID;
15:
16: /**
17: * 计算消息队列拉取消息对应的Broker编号
18: *
19: * @param mq 消息队列
20: * @return Broker编号
21: */
22: public long recalculatePullFromWhichNode(final MessageQueue mq) {
23: // 若开启默认Broker开关,则返回默认Broker编号
24: if (this.isConnectBrokerByUser()) {
25: return this.defaultBrokerId;
26: }
27:
28: // 若消息队列映射拉取Broker存在,则返回映射Broker编号
29: AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
30: if (suggest != null) {
31: return suggest.get();
32: }
33:
34: // 返回Broker主节点编号
35: return MixAll.MASTER_ID;
36: }
Broker
编号。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
信息(Broker
地址、是否为从节点)。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
编号的映射。tagCode
匹配合适消息。Broker
编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker
编号,会使用更新后的 Broker
编号。tagCode
匹配合适消息。tagCode
匹配消息。Hook
。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
如果用最简单粗暴的方式描述 PullConsumer
拉取消息的过程,那就是如下的代码:
while (true) {
if (不满足拉取消息) {
Thread.sleep(间隔);
continue;
}
主动拉取消息();
}
DefaultMQPushConsumerImpl消费消息
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。如果消息数超过批量消费上限,会不会是BUG。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Topic
为原始 Topic
。例如:原始 Topic
为 TopicTest
,重试时 Topic
为 %RETRY%please_rename_unique_group_name_4
,经过该方法,Topic
设置回 TopicTest
。Hook
。Hook
。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
ackIndex
值。consumeRequest.msgs[0 - ackIndex]
为消费成功,需要进行 ack
确认。CONSUME_SUCCESS
:ackIndex = context.getAckIndex()
。RECONSUME_LATER
:ackIndex = -1
。Broker
。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。Broker
失败的消息,直接提交延迟重新消费。Broker
成功,结果因为例如网络异常,导致 Consumer
以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。BROADCASTING
:广播模式,无论是否消费失败,不发回消息到 Broker
,只打印日志。CLUSTERING
:集群模式,消费失败的消息发回到 Broker
。Broker
成功】**的消息,并更新最新消费进度。Broker
成功】的消息?见第 56 行**。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Consumer
发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。Consumer
内置默认 Producer
发送消息。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
OffsetStore类图.png
RemoteBrokerOffsetStore
:Consumer
集群模式 下,使用远程 Broker
消费进度。LocalFileOffsetStore
:Consumer
广播模式下,使用本地 文件
消费进度。 1: @Override
2: public void load() throws MQClientException {
3: // 从本地硬盘读取消费进度
4: OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
5: if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
6: offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
7:
8: // 打印每个消息队列的消费进度
9: for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
10: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
11: log.info("load consumer's offset, {} {} {}",
12: this.groupName,
13: mq,
14: offset.get());
15: }
16: }
17: }
1: public class OffsetSerializeWrapper extends RemotingSerializable {
2: private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
3: new ConcurrentHashMap<>();
4:
5: public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
6: return offsetTable;
7: }
8:
9: public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
10: this.offsetTable = offsetTable;
11: }
12: }
Offset
存储序列化。Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1471,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1470
}
}
1: @Override
2: public void load() {
3: }
Broker
获取。读取消费进度类型:
READ_FROM_MEMORY
:从内存读取。READ_FROM_STORE
:从存储( Broker
或 文件
)读取。MEMORY_FIRST_THEN_STORE
:优先从内存读取,读取不到,从存储读取。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
文件
读取消费进度。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
读取消费进度。该方法 RemoteBrokerOffsetStore
与 LocalFileOffsetStore
实现相同。
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
Broker
,并移除非指定消息队列。// ... 文章过长,超过限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
?可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。 感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。?真的有丢丢长。