rmq_sys_wheel_timer
rmq_sys_wheel_timer
队列中TimerMessageStore
消费队列数据,将数据消费到 timerWheel
使用时间轮算法,实现秒级任务store\consumequeue\rmq_sys_wheel_timer
从队列中读取消息, 提取数据存到 timerlog
与 timerwheel
中store\checkpoint
对应 TimerMessageStore#timerCheckpoint
lastReadTimeMs
上次消费的时间节点lastTimerLogFlushPos
最后刷新 log的 poslastTimerQueueOffset
最后一次消费的队列节点masterTimerQueueOffset
主 Broker 的队列消费节点store\timerwheel
时间轮,内由 Slot
组成 结构如下 timeMs
消息到达时间firstPos
开始的 poslastPos
结束的 pos 在 timerLog 中读取数据, 后面会讲具体逻辑num
消息数量magic
no use now, just keep itstore\timerlog
对应 TimerMessageStore#timerCheckpoint
里边也是由多个 mappedFile
组成。
主要是存储原msg的数据,
因为从 rmq_sys_wheel_timer
消费了之后,
会存到 timerwheel
与 timerlog
中TimerMessageStore.this.enqueue
默认 100毫秒执行一次rmq_sys_wheel_timer
消费数据 ps: currQueueOffset
从 checkpoint
读取出来的enqueuePutQueue
中currQueueOffset + 1
进入下一个循环 消费下一个 offset 节点enqueuePutQueue
中的数据shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs
检查消费的消息是否已到达投递时间。 dequeuePutQueue.put(req);
中doEnqueue
-> timerWheel.getSlot(delayedTime)
获取延迟时间插槽。ByteBuffer
投入 timerLog
中数据结构为:timerLog.append
返回插入位置 rettimerWheel
|消息到达时间戳|firstPos|ret (timerLog.append返回位置)| 消息数量| 0|timerWheel
中的数据currReadTimeMs
来获取 timerWheel
插槽数据 currReadTimeMs
初始化的时候 timerCheckpoint.getLastReadTimeMs()
读取的是上次最后消费的数据currReadTimeMs
会按照上一次宕机的时间开始搜寻数据, 这样子宕机消息也不会丢失。会在启动的那段时间被投递出去currReadTimeMs
在 moveReadTime
方法中会自增timerWheel.getSlot(currReadTimeMs);
读取插槽数据 long currOffsetPy = slot.lastPos;
读取插槽属性, 最后一个pos节点timerLog.getWholeBuffer(currOffsetPy)
根据 currOffsetPy
获取 SelectMappedBufferResult
timerLog
的 SelectMappedBufferResult
中获取数据。 prevPos
上一个节点数据enqueueTime
放入 timerLog 的时间delayedTime
消息到达时间戳offsetPy
commitLog的数据位置sizePy
commitLog的数据大小TimerRequest
讲消息投递到 dequeueGetQueue
中currOffsetPy = prevPos
将位置移动到前一个,进行遍历TimerDequeueGetMessageService
实例同时消费 dequeueGetQueue
getMessageByCommitOffset
从 commitLog
中读取原投递的消息数据uniqkey
判断不在 deleteList
中的时候 将消息投递到 dequeuePutQueue
中去TimerDequeuePutMessageService
实例同时消费 dequeuePutQueue
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
将消息转换成原始的 topic 消息,清除无用属性doPut
-> messageStore.putMessage(message)
将消息投递到指定 messageQueue
中timerLog
刷盘timerWheel
刷盘timerCheckpoint
刷盘timerLog.load()
加载文件timerMetrics.load
加载文件recover
-> recoverAndRevise(lastFlushPos, true)
ps: (用于 timerWhel
跟 timerLog
的数据保持一致刷新) lastFlushPos
最后一次刷盘的位置, 其实最终是拿到 timerlog -> mappedFile
的第几个文件mappedFile
的数据timerWheel.reviseSlot
修改插槽数据。 检查这个时间的插槽是否已经有填充数据。 lastPos
(顺序遍历。这里最终还是会是最后一个 lastPos)putSlot
reviseQueueOffset(processOffset);
读取 timerLog
最后一个数据, 为了校验最后一个数据是否正常,是否能读取到消息。currQueueOffset
数据currReadTimeMs
数据