前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq——索引文件的读写机制

rabbitmq——索引文件的读写机制

作者头像
陈猿解码
发布2023-02-28 14:13:03
5980
发布2023-02-28 14:13:03
举报
文章被收录于专栏:陈猿解码陈猿解码

【前言】

在前面的文章中,我们讲解了索引文件的格式,里面提到了针对消息主要有publish,delivery,ack三个操作,而索引文件中主要也就是存储了消息这三个操作对应的二进制数据。那么什么情况下会进行索引文件的读写,具体流程又是怎样的,有些怎样的设计考虑。本文对其相关原理进行了一些总结。

【重要数据结构】

在rabbitmq内部,rabbit_queue_index模块负责队列索引文件的读写,并在内存中维护相关信息,其内部结构如下图所示:

主要的数据结构是qistate和segment。其中qistate是queue index state的缩写,包含的几个重要字段为:

dir:该队列索引信息存储路径

segments:segment的集合

journal_handle:journal.jif文件的操作句柄

max_journal_entries:日志flush到磁盘之前,允许在内存缓存的最大条数

dirty_count:没有flush到磁盘的日志条数,即当前内存缓存的日志条数

segment包含的几个重要字段是:

num:segment的编号,也是该segment写入的idx文件对应的编号,从0开始递增(每个segment中的数据会写入一个对应的后缀为idx的文件中)

path:segment对应存储文件(*.idx)的完整路径

journal_entries:一个数组,数组中的每一项记录一条消息已执行的操作(publish,delivery,ack)

entries_to_segment:也是一个数组,数组中的每一项对应一条消息已执行的操作的二进制数据

unack:该segment中已经投递给消费者,但还未收到消费者ack的消息数

file_handle_cache模块:几乎所有的文件操作都会调用该模块进行处理。对于每个将要读写的文件,除了可以控制设置对应的打开模式(仅读、仅写、读写)之外,还可以设置缓存的大小。对于写模式,所有的写入都是先直接写到缓存中,只有等缓存大小超过了设置的值的时候,才将缓存数据真正写入文件。

【写流程】

rabbit_queue_index模块提供了publish,delivery,ack接口供调用,这三个接口的处理流程差不多都一样:首先将对消息的操作序列化成二进制数据,并追加写入日志文件journal.jif中;然后根据消息的序号,找到消息对应存储的segment,并在segment中journal_entries记录消息对应的操作,同时将二进制数据添加到entries_to_segment对应的数组项中;最后按需将segment中的数据写入相应的idx文件中。

粗略看这个流程,觉得逻辑相对简单,但仔细分析,就会有很多疑问。例如:每个操作对应的日志数据都会先写入journal.jif文件,那是否意味着每条日志数据都实时刷到磁盘上了?segment中的数据什么时候写入相应的idx文件中?segment中的数据写到idx文件后,journal.jif中的数据是否还有用,会被怎么处理?这两个文件中的数据有什么区别?如果生产者发送的一条消息立马被消费者消费了,这条消息相关的操作数据还会被写到磁盘上吗?

阅读相关源码后,让我们一个一个来看这些问题。

  • 是否消息的每个操作日志数据都实时写入journal.jif文件中了?

其实写journal.jif文件时并非真正将内容写入到文件中了。前面讲到了文件的读写操作都通过file_handle_cache模块处理。对journal.jif文件的操作,具体是以写模式打开,同时缓存大小设置为无穷大。因此,在索引模块中,对消息的每个操作日志数据进行的写(journal.jif文件)操作,最终都只是在内存中缓存,并没有真正进行文件系统级别的写操作。只有等关闭该文件,或者显示调用将内存数据同步刷到磁盘时,才进行真正的操作。

  • 什么时候将segment中的数据写入idx文件呢?

前面提到了qistate中有个dirty_count字段,表示未写入磁盘的日志数量。消息每个操作的日志数据写journal.jif时,该字段加1;当累计到指定值时,会将当前segment中的数据写入相应idx文件中。这个值默认大小为32768,也就是队列消息的publish、delivery、ack操作累计达到32768次后,将segment中的数据写入idx文件中。

可通过queue_index_max_journal_entries进行配置。

除此之外,队列会根据内存的使用情况,动态决定在内存驻留的消息数(初始为无穷大),如果dirty_count超过了内存允许驻留的消息数,也会触发将segment中的数据写入idx文件中。

另外,idx文件和journal.jif文件一样,以写模式打开并设置缓存为无穷大,写入的消息都先在缓存中,但idx文件写完后会立即关闭该文件,这样缓存中的数据会通过writev进行文件系统级别的写动作,并最终通过fsync同步刷到磁盘上。

考虑这么一种情况:当内存足够(保证队列允许在内存驻留的消息数足够大)时,仅向队列发送一条(持久化的)消息,此时上面提到的两个条件均无法满足,是否这种情况下,消息就永远不会写入到磁盘上了?

答案当然是否定的!

队列收到消息后,其对应的处理进程会设置一个定时器,定时器超时后会触发rabbit_queue_index模块将journal.jif中的数据同步刷到磁盘中(注意:这里仅仅是将journal.jif缓存中的数据写入文件中,并没有写idx文件)。另外,当队列进程持续一段时间没有收到任何消息时,会进入hibernate阶段,这个时候会触发将segment中的数据写入idx文件中。

  • 数据写入idx了后,journal.jif中的数据怎么处理?

当segment中的数据写入idx文件后,会清空journal.jif对应缓存中的数据,同时通过ftruncate对文件清空,并将文件偏移位置移动到起始位置。也就是说数据写入idx之后,journal.jif中的数据就没有用了,直接进行清除处理。

  • idx文件与journal.jif文件中的数据有什么区别呢?

两个文件中的数据几乎没有区别,都是消息操作的二进制数据。唯一的差别在于消息的ID的记录。在journal.jif中记录的是消息在队列中的序号,而在idx文件中记录的是消息在该文件中的序号(也就是journal_entries数组中的下标)。

因为journal.jif文件只有一个,里面记录了所有消息的操作,因此需要记录消息在队列中的序号,保证消息被有序消费;而每个idx文件中,固定存储16384个消息的操作日志数据,对消息在队列中的序号除16384再求余,就计算出应该存储在哪个segment中数组的哪个位置了,因此存储的时候只需要存储在该文件中的相对序号即可。

  • 生产者发送的一条消息如果立马被消费者消费了,该消息相关的操作数据是否还会写到磁盘上?

前面讲到了segment中的journal_entries是一个数组,记录该segment中每个消息已执行到操作。当消息最终被消费者ack后,rabbit_queue_index会将journal_entries中该消息相关的操作清空,同时entries_to_segment中对应的二进制数据也会被清除,这意味着将segment中的数据写入到idx文件的时候,该消息因为没有任何操作日志,所以针对该消息也就不会有任何数据写入到idx文件中。

假如某个时刻,消息还未被ack,此时触发写idx文件的话,该消息的publish,delivery对应的二进制数据会被写入idx文件中;下一时刻,如果对该消息进行了ack,再次写idx文件时,ack操作对应的二进制数据可能也会被追加写入对应的idx文件中。

那有没有完全不需要写idx文件的时候呢?或者说idx文件什么情况下会被删除呢?

segment数据结构中有个unack字段,表示该segment中未被ack的消息数。publish一条消息时,unack的值加1;delivery一条消息时,unack的值不变;ack一条消息时,unack的值减1。每当写idx文件时,会先判断unack的值,如果该值为0,意味着该segment中所有的消息都已经ack了,消息也没必要进行存储了,此时不但不会有任何的写文件动作,还会直接删除该文件(之前可能有写入)。

【读流程】

相比写流程,索引文件的读流程要简单很多。rabbit_queue_index模块对外提供的读接口是读取一个序号范围段内的所有消息。实际处理过程中,根据起始序号找到对应的segment,先从segment对应的idx文件中读取(读取整个文件的内容,并解析得到一份segment),然后与内存中segment的信息进行合并并更新,得到最终的消息集合,这样就完成了读操作。

【strace跟踪验证】

在阅读源码的过程中,也用strace分析了实际的读写情况,以下面两个场景举例

  • 发送一条消息到队列时的情况:
代码语言:javascript
复制
# 打开journal.jif
[pid  1230] 09:08:03.361755 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/journal.jif", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  1230] 09:08:03.361870 <... open resumed> ) = 14 <0.000070>
[pid  1230] 09:08:03.361936 fstat(14,  <unfinished ...>
[pid  1230] 09:08:03.361997 <... fstat resumed> {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 <0.000040>
[pid  1230] 09:08:03.363213 lseek(14, 0, SEEK_END <unfinished ...>
[pid  1230] 09:08:03.363298 <... lseek resumed> ) = 0 <0.000069>
# 定时器超时(200毫秒) 将缓存中的数据写入文件中
[pid  1230] 09:08:03.568104 writev(14, [...], 2 <unfinished ...>
[pid  1230] 09:08:03.568235 <... writev resumed> ) = 1249 <0.000104>
# 队列进程进入hibernate, 写idx文件
[pid  1230] 09:08:05.166167 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  1230] 09:08:05.166329 <... open resumed> ) = 15 <0.000107>
[pid  1230] 09:08:05.166465 fstat(15,  <unfinished ...>
[pid  1230] 09:08:05.166567 <... fstat resumed> {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 <0.000076>
[pid  1230] 09:08:05.174817 lseek(15, 0, SEEK_END <unfinished ...>
[pid  1230] 09:08:05.174942 <... lseek resumed> ) = 0 <0.000104>
[pid  1230] 09:08:05.177466 writev(15, [...], 2 <unfinished ...>
[pid  1230] 09:08:05.177560 <... writev resumed> ) = 1243 <0.000078>
[pid  1230] 09:08:05.178717 fsync(15 <unfinished ...>
[pid  1230] 09:08:05.179367 <... fsync resumed> ) = 0 <0.000608>
[pid  1230] 09:08:05.180224 close(15 <unfinished ...>
[pid  1230] 09:08:05.180295 <... close resumed> ) = 0 <0.000049>
# idx文件写完后, 清理journal.jif文件
[pid  1230] 09:08:05.181555 lseek(14, 0, SEEK_SET <unfinished ...>
[pid  1230] 09:08:05.181644 <... lseek resumed> ) = 0 <0.000071>
[pid  1230] 09:08:05.182545 lseek(14, 0, SEEK_CUR <unfinished ...>
[pid  1230] 09:08:05.182632 <... lseek resumed> ) = 0 <0.000071>
[pid  1230] 09:08:05.182688 ftruncate(14, 0 <unfinished ...>
[pid  1230] 09:08:05.182794 <... ftruncate resumed> ) = 0 <0.000082>
  • 持续生产消费的情况(队列中几乎无堆积)
代码语言:javascript
复制
# 写 0.idx
[pid  4486] 09:43:13.814236 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:13.814423 <... open resumed> ) = 73 <0.000129>
[pid  4486] 09:43:13.835035 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.835263 <... writev resumed> ) = 39776 <0.000199>
[pid  4486] 09:43:13.835929 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.836065 <... writev resumed> ) = 39776 <0.000108>
[pid  4486] 09:43:13.836716 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.836869 <... writev resumed> ) = 39776 <0.000127>
[pid  4486] 09:43:13.837491 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.837619 <... writev resumed> ) = 39776 <0.000099>
[pid  4486] 09:43:13.838277 writev(73, [...], 64 <unfinished ...>
[pid  4486] 09:43:13.838402 <... writev resumed> ) = 39776 <0.000097>
......
[pid  4486] 09:43:13.899874 writev(73, [...], 2 <unfinished ...>
[pid  4486] 09:43:13.899938 <... writev resumed> ) = 1243 <0.000047>
[pid  4486] 09:43:13.900538 fsync(73 <unfinished ...>
[pid  4486] 09:43:13.914344 <... fsync resumed> ) = 0 <0.013774>
[pid  4486] 09:43:13.915335 close(73 <unfinished ...>
[pid  4486] 09:43:13.915464 <... close resumed> ) = 0 <0.000111>
# 清空journal.jif
[pid  4486] 09:43:13.917176 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:13.917358 ftruncate(68, 0 <unfinished ...>

# 写 0.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:15.358110 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:15.469725 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:15.469913 ftruncate(68, 0 <unfinished ...>


# 写 0.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:17.671522 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:17.789003 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4486] 09:43:17.789243 ftruncate(68, 0 <unfinished ...>

# 写0.idx与1.idx 然后清空journal.jif(省略了中间writev)
[pid  4486] 09:43:20.321408 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/1.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4486] 09:43:20.340966 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>
[pid  4483] 09:43:20.473029 lseek(68, 0, SEEK_CUR <unfinished ...>
[pid  4483] 09:43:20.473272 ftruncate(68, 0 <unfinished ...>

# 写 1.idx
[pid  5082] 09:46:17.679119 open("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/1.idx", O_RDWR|O_CREAT, 0666 <unfinished ...>

# 删除 0.idx
[pid  5082] 09:46:17.693002 stat("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx",  <unfinished ...>
[pid  5082] 09:46:17.693267 access("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", R_OK <unfinished ...>
[pid  5082] 09:46:17.693453 access("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx", W_OK <unfinished ...>
[pid  5082] 09:46:17.694280 unlink("/opt/rabbitmq_server-3.7.14/var/lib/rabbitmq/mnesia/spurs/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/F0ROKXGRLN5MYA44OI85RU72B/0.idx" <unfinished ...>

从strace抓到的系统调用来看,上述的分析流程与实际情况都是能对应起来的,同时也有些细节要注意,例如:

  • 在持续生产、消费时,journal.jif文件几乎不会有写入的动作,真正写入的只有idx文件。
  • 在每次dirty_count达到指定次数触发将segment中的数据写入idx文件时,可能会一次写多个idx文件。
  • 每次写idx文件的数据量是不确定的

【总结】

本文主要对索引文件的读写流程进行了说明,同时也通过strace进行了验证,了解这些原理后会有助于进行性能调优。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档