本文接《RocketMQ 源码分析 —— Message 发送与接收》。
主要解析 CommitLog
存储消息部分。
CommitLog
、MappedFileQueue
、MappedFile
的关系如下:
CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等文件。MappedFileQueue
:MappedFile
所在的文件夹,对 MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。MappedFile
统一文件大小。CommitLog
里默认为 1GB。CommitLog
:针对 MappedFileQueue
的封装使用。CommitLog
目前存储在 MappedFile
有两种内容类型:
CommitLog
存储在 MappedFile
的结构:
MESSAGE[1] | MESSAGE[2] | ... | MESSAGE[n - 1] | MESSAGE[n] | BLANK |
---|
MESSAGE
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 | Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
// 省略代码
MappedFile
,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。MappedFile
,解析解析见:MappedFile#appendMessage(...)。MappedFile
已满,创建新的,再次插入消息。Broker
主从同步。后面的文章会详细解析?。// 省略代码
MappedFile
,若不存在或文件已满,则进行创建。createOffset
。MappedFile
的文件命名规则:> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)
目前 `CommitLog` 的 `startOffset` 为 0。
此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如:
| startOffset | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |
_如果有知道的同学,麻烦提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
MappedFile
是否是第一个创建的文件。该标识用于 ConsumeQueue
对应的 MappedFile
,详见 ConsumeQueue#fillPreBlank
。 // 省略代码
MappedFile
,并返回插入结果。writeBuffer != null
的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。position
,执行写入,更新 wrotePosition
(当前写入位置,下次开始写入开始位置)。// 省略代码
CommitLog
的顺序存储位置。CommitLog
里的 offsetMsgId
。这里一定要和 msgId
区分开。计算方式 | 长度 | |||
---|---|---|---|---|
offsetMsgId | Broker存储时生成 | Hex(storeHostBytes, wroteOffset) | 32 | |
msgId | Client发送消息时生成 | Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) | 32 | 《RocketMQ 源码分析 —— Message 基础》 |
BLANK
占位,返回结果。MESSAGE
。线程服务 | 场景 | 插入消息性能 |
---|---|---|
CommitRealTimeService | 异步刷盘 && 开启内存字节缓冲区 | 第一 |
FlushRealTimeService | 异步刷盘 && 关闭内存字节缓冲区 | 第二 |
GroupCommitService | 同步刷盘 | 第三 |
方式 | |||
---|---|---|---|
方式一 | 写入内存字节缓冲区(writeBuffer) | 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) | 文件通道(fileChannel)flush |
方式二 | 写入映射文件字节缓冲区(mappedByteBuffer) | 映射文件字节缓冲区(mappedByteBuffer)flush |
flush相关代码
考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE
才进行 flush
。
// 省略代码
commit相关代码:
考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE
才进行 commit
。
// 省略代码
消息插入成功时,异步刷盘时使用。
// 省略代码
flush
线程服务,调用 MappedFile#flush
相关逻辑。flushPhysicQueueThoroughInterval
周期,执行一次 flush
。因为不是每次循环到都能满足 flushCommitLogLeastPages
大小,因此,需要一定周期进行一次强制 flush
。当然,不能每次循环都去执行强制 flush
,这样性能较差。flushCommitLogTimed
参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用 commitLogService.wakeup()
。MappedFile
进行 flush
。Broker
关闭时,强制 flush
,避免有未刷盘的数据。消息插入成功时,异步刷盘时使用。
和 FlushRealTimeService
类似,性能更好。
// 省略代码
消息插入成功时,同步刷盘时使用。
// 省略代码
sync
的原因:this.requestsWrite
会和 this.requestsRead
不断交换,无法保证稳定的同步。flush
。MappedFile
(写第N个消息时,MappedFile
已满,创建了一个新的),所以需要有循环2次。CountDownLatch
实现isWaitStoreMsgOK
未设置成 TRUE
,导致未走批量提交。wakeup()
时,则会立即进行一次批量提交。当 Broker
设置成同步落盘 && 消息 isWaitStoreMsgOK=true
,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。