摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本章主要解析 消费 逻辑涉及到的源码。因为篇幅较长,分成上下两篇:
Broker
相关源码。Consumer
相关源码。本文即是上篇。
ok,先看第一张关于消费逻辑的图:
消费逻辑图
再看消费逻辑精简的顺序图(实际情况会略有差别):
Consumer&Broker消费精简图.png
ConsumeQueue
、MappedFileQueue
、MappedFile
的关系如下:
ConsumeQueue
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000
ConsumeQueue
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夹,对 MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。MappedFile
统一文件大小。ConsumeQueue
里默认为 6000000B。ConsumeQueue
:针对 MappedFileQueue
的封装使用。Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。ConsumeQueue
存储在 MappedFile
的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有两种内容类型:
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白占位。当历史 Message
被删除时,需要用 BLANK
占位被删除的消息。MESSAGE_POSITION_INFO
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存储位置 | Long | 8 |
2 | size | 消息长度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
CommitLog重放ConsumeQueue图
主要有两个组件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。ReputMessageService顺序图
1: class ReputMessageService extends ServiceThread {
2:
3: /**
4: * 开始重放消息的CommitLog物理位置
5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /**
34: * 剩余需要重放消息字节数
35: *
36: * @return 字节数
37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /**
43: * 是否commitLog需要重放消息
44: *
45: * @return 是否
46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑问:这个是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍历MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放调度请求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息长度
71: // 根据请求的结果处理
72: if (dispatchRequest.isSuccess()) { // 读取成功
73: if (size > 0) { // 读取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 统计
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 读取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 读取失败
98: if (size > 0) { // 读取到Message却不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 读取到Blank却不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }
ReputMessageService用例图
Broker
是主节点 && Broker
开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener
会 调用 PullRequestHoldService#notifyMessageArriving(...)
方法,详细解析见:PullRequestHoldServicereputFromOffset
开始的 CommitLog
对应的 MappedFile
对应的 MappedByteBuffer
。MappedByteBuffer
。DispatchRequest
) 。请求里主要包含一条消息 (Message
) 或者 文件尾 (BLANK
) 的基本信息。Message
,进行调度,生成 ConsumeQueue
和 IndexFile
对应的内容。详细解析见:Blank
,即文件尾,跳转指向下一个 MappedFile
。shutdown
时,多次 sleep(100)
直到 CommitLog
回放到最新位置。恩,如果未回放完,会输出警告日志。 1: /**
2: * 执行调度请求
3: * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
4: * 2. 建立 索引信息 到 IndexFile
5: *
6: * @param req 调度请求
7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 建立 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /**
28: * 建立 消息位置信息 到 ConsumeQueue
29: *
30: * @param topic 主题
31: * @param queueId 队列编号
32: * @param offset commitLog存储位置
33: * @param size 消息长度
34: * @param tagsCode 消息tagsCode
35: * @param storeTimestamp 存储时间
36: * @param logicOffset 队列位置
37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }
1: /**
2: * 添加位置信息封装
3: *
4: * @param offset commitLog存储位置
5: * @param size 消息长度
6: * @param tagsCode 消息tagsCode
7: * @param storeTimestamp 消息存储时间
8: * @param logicOffset 队列位置
9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 多次循环写,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 调用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存储时间 作为 存储check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 设置异常不可写入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /**
41: * 添加位置信息,并返回添加是否成功
42: *
43: * @param offset commitLog存储位置
44: * @param size 消息长度
45: * @param tagsCode 消息tagsCode
46: * @param cqOffset 队列位置
47: * @return 是否成功
48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
50: final long cqOffset) {
51: // 如果已经重放过,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 写入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 计算consumeQueue存储位置,并获得对应的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 设置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /**
97: * 填充前置空白占位
98: *
99: * @param mappedFile MappedFile
100: * @param untilWhere consumeQueue存储位置
101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 写入前置空白占位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循环填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }
#putMessagePositionInfoWrapper(...)
说明 :添加位置信息到 ConsumeQueue
的封装,实际需要调用 #putMessagePositionInfo(...)
方法。ConsumeQueue
是否允许写入。当发生Bug时,不允许写入。#putMessagePositionInfo(...)
方法,添加位置信息。StoreCheckpoint
的详细解析见:Store初始化与关闭。ConsumeQueue
写入异常,不允许继续写入。#putMessagePositionInfo(...)
说明 :添加位置信息到 ConsumeQueue
,并返回添加是否成功。Topic
长期无消息产生,突然N天后进行发送,Topic
对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile
需要前置占位。offset
(存储位置) 小于等于 maxPhysicOffset
(CommitLog
消息重放到 ConsumeQueue
最大的 CommitLog
存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。ConsumeQueue
存储位置,并获得对应的MappedFile。MappedFile
是 ConsumeQueue
当前第一个文件 && MappedFile
未写入内容 && 重放消息队列位置大于0,则需要进行 MappedFile
填充前置 BLANK
。ConsumeQueue
存储位置是否合法,不合法则输出日志。CommitLog
重放消息到 ConsumeQueue
的最大位置。MappedFile
。 1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /**
4: * 最后flush时间戳
5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消费队列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存储 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }
ConsumeQueue
(消费队列) 线程服务。retryTimes == RETRY_TIMES_OVER
时,进行强制flush。用于 shutdown
时。ConsumeQueue
(消费队列)。StoreCheckpoint
。StoreCheckpoint
的详细解析见:Store初始化与关闭。flush
。如果 wakeup() 时,则会立即进行一次 flush
。目前,暂时不存在 wakeup() 的调用。 1: public class PullMessageRequestHeader implements CommandCustomHeader {
2: /**
3: * 消费者分组
4: */
5: @CFNotNull
6: private String consumerGroup;
7: /**
8: * Topic
9: */
10: @CFNotNull
11: private String topic;
12: /**
13: * 队列编号
14: */
15: @CFNotNull
16: private Integer queueId;
17: /**
18: * 队列开始位置
19: */
20: @CFNotNull
21: private Long queueOffset;
22: /**
23: * 消息数量
24: */
25: @CFNotNull
26: private Integer maxMsgNums;
27: /**
28: * 系统标识
29: */
30: @CFNotNull
31: private Integer sysFlag;
32: /**
33: * 提交消费进度位置
34: */
35: @CFNotNull
36: private Long commitOffset;
37: /**
38: * 挂起超时时间
39: */
40: @CFNotNull
41: private Long suspendTimeoutMillis;
42: /**
43: * 订阅表达式
44: */
45: @CFNullable
46: private String subscription;
47: /**
48: * 订阅版本号
49: */
50: @CFNotNull
51: private Long subVersion;
52: }
FLAG_COMMIT_OFFSET
:标记请求提交消费进度位置,和 commitOffset
配合。FLAG_SUSPEND
:标记请求是否挂起请求,和 suspendTimeoutMillis
配合。当拉取不到消息时, Broker
会挂起请求,直到有消息。最大挂起时间:suspendTimeoutMillis
毫秒。FLAG_SUBSCRIPTION
:是否过滤订阅表达式,和 subscription
配置。// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
Broker
是否可读。SubscriptionGroupConfig
(订阅分组配置) 是否存在 && 可以消费。PullMessageRequestHeader.sysFlag
对应的标志位。TopicConfig
(主题配置) 是否存在 && 可读 && 队列编号正确。SubscriptionData
(订阅信息) 是否正确。MessageStore#getMessage(...)
获取 GetMessageResult
(消息)。详细解析见:MessageStore#getMessage(...)。brokerId
。Hook
逻辑,#executeConsumeMessageHookBefore(...)
。readGetMessageResult(...)
获取消息内容到堆内内存,设置到 响应body
。zero-copy
实现,直接响应,无需堆内内存,性能更优。TODO :此处等对zero-copy有研究,再补充一些。Broker
允许挂起 && 请求要求挂起),执行挂起请求。详细解析见:PullRequestHoldService。tools
模块研究后再补充。Broker
非主 && 请求要求持久化进度)。详细解析见:更新消费进度。// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
group
) + 主题(Topic
) + 队列编号(queueId
) + 队列位置(offset
) + 订阅信息(subscriptionData
) 获取 指定条数(maxMsgNums
) 消息(Message
)。Store
是否处于关闭状态,若关闭,则无法获取消息。Topic
) + 队列编号(queueId
) 获取 消息队列(ConsumeQueue
)。#findConsumeQueue(...)
:第 159 至 196 行。offset
) 无法读取消息,并针对对应的情况,计算下一次 Client
队列拉取位置。offset
) 太小。offset
) 恰好等于 消息队列最大的队列位置。该情况是正常现象,相当于查询最新的消息。offset
) 超过过多。#nextOffsetCorrection(...)
:第 198 至 212 行。offset
) 获取 对应的MappedFile
。消息位置信息
。#checkInDiskByCommitOffset(...)
:第 214 至 224 行。#isTheBatchFull(...)
:第 226 至 264 行。消息位置信息
。offsetPy
小于 nextPhyFileStartOffset
时,意味着对 应的 Message
已经移除,所以直接continue,直到可读取的 Message
。CommitLog
获取对应 消息的MappedByteBuffer
。消息MappedByteBuffer
成功。消息MappedByteBuffer
失败。从 CommitLog
无法读取到消息,说明 该消息对应的文件(MappedFile
) 已经删除,此时计算下一个MappedFile
的起始位置。该逻辑需要配合(第 79 至 83 行)一起理解。bufferConsumeQueue
对 MappedFile
的指向。此处 MappedFile
是 ConsumeQueue
里的文件,不是 CommitLog
下的文件。offset
) 获取 对应的MappedFile
为空,计算ConsumeQueue
从 offset
开始的下一个 MappedFile
对应的位置。 1: public class DefaultMessageFilter implements MessageFilter {
2:
3: @Override
4: public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
5: // 消息tagsCode 空
6: if (tagsCode == null) {
7: return true;
8: }
9: // 订阅数据 空
10: if (null == subscriptionData) {
11: return true;
12: }
13: // classFilter
14: if (subscriptionData.isClassFilterMode())
15: return true;
16: // 订阅表达式 全匹配
17: if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
18: return true;
19: }
20: // 订阅数据code数组 是否包含 消息tagsCode
21: return subscriptionData.getCodeSet().contains(tagsCode.intValue());
22: }
23:
24: }
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
PullRequestHoldService
说明 :拉取消息请求挂起维护线程服务。#suspendPullRequest(...)
说明 :添加拉取消息挂起请求到集合( pullRequestTable
)。#run(...)
说明 :定时检查挂起请求是否有需要通知重新拉取消息并进行通知。长轮训
or短轮训
设置不同的等待时间。#checkHoldRequest(...)
说明 :遍历挂起请求,检查是否有需要通知的。#notifyMessageArriving(...)
说明 :检查指定队列是否有需要通知的请求。maxOffset
过小,重新获取一次最新的。#executeRequestWhenWakeup(...)
,实际是丢到线程池进行一步的消息拉取,不会有性能上的问题。详细解析见:PullMessageProcessor#executeRequestWhenWakeup(...)。pullRequestTable
)。 1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
2: Runnable run = new Runnable() {
3: @Override
4: public void run() {
5: try {
6: // 调用拉取请求。本次调用,设置不挂起请求。
7: final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
8:
9: if (response != null) {
10: response.setOpaque(request.getOpaque());
11: response.markResponseType();
12: try {
13: channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
14: @Override
15: public void operationComplete(ChannelFuture future) throws Exception {
16: if (!future.isSuccess()) {
17: LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
18: LOG.error(request.toString());
19: LOG.error(response.toString());
20: }
21: }
22: });
23: } catch (Throwable e) {
24: LOG.error("ProcessRequestWrapper process request over, but response failed", e);
25: LOG.error(request.toString());
26: LOG.error(response.toString());
27: }
28: }
29: } catch (RemotingCommandException e1) {
30: LOG.error("ExecuteRequestWhenWakeup run", e1);
31: }
32: }
33: };
34: // 提交拉取请求到线程池
35: this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
36: }
Broker
无限循环。Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}
consumerOffset.json
:消费进度存储文件。consumerOffset.json.bak
:消费进度存储文件备份。consumerOffset.json
,将原内容备份到 consumerOffset.json.bak
。实现见:MixAll#string2File(...)。 1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
大部分逻辑和 Broker
提供[接收消息]接口 类似,可以先看下相关内容。
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
Consumer
消费某条消息失败时,会调用该接口发回消息。Broker
会存储发回的消息。这样,下次 Consumer
拉取该消息,能够从 CommitLog
和 ConsumeQueue
顺序读取。Broker
接收普通消息 很相似,时候 TODO
标记成独有的逻辑。Broker
是否有写入权限。retryQueueNums
。sysFlag
。TopicConfig
。如果不存在,则创建。retryTopic
到消息拓展属性。Broker
刷盘方式为同步,会导致同步落盘不能批量提交,这样会不会存在问题?有知道的同学麻烦告知下。?。delayLevel
。MessageExtBrokerInner
。感谢同学们对本文的阅读、收藏、点赞。
?如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以一起沟通下。让我们来一场 1 :1 交流(搞基)。
再次表示十分感谢。