前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列之-rocketmq文件数据存储

面试系列之-rocketmq文件数据存储

作者头像
用户4283147
发布2022-12-29 20:05:29
5960
发布2022-12-29 20:05:29
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

文件系统的结构设计

CommitLog

Broker上的Topic上的消息都会顺序的写入到commitlog文件下,然后再异步转存到consumequeue以及indexFile文件;该消息的元信息存储着消息所在的Topic与Queue,当消费者要进行消费时,会通过ConsumerQueue文件来找到自己想要消费的队列;该队列不存储具体的消息,而是存储消息的基本信息与偏移量。消费者通过偏移量去CommitLog中找到自己需要消费的信息然后取出,就可以进行消费;并且Broker还可以对CommitLog来建立Hash索引文件IndexFile,这样就可以通过消息的key来找到消息;

consumeQueue

RocketMQ在为每个Topic在~/store/consumeQueue中创建一个目录,目录名称就是Topic的名称,在该Topic下会为每个Queue创建独立的目录,目录名为QueueId,每个目录中存放着若干的consumeQueue文件,consumeQueue是commitlog的索引文件,可以通过consumeQueue定位到commitlog的具体消息;

RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在CommitLog文件中,试想一下如果消息消费者直接从消息存储文件(CommitLog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ为了适应消息消费的检索需求,设计了消息消费队列文件(ConsumeQueue),该文件可以看成是CommitLog关于消息消费的“索引”文件,消息主题,第二级目录为主题的消息队列;

indexFile

除了通过正常的指定Topic进行消息消费外,RocketMQ还提供了一种根据key进行消息查询的功能,该查询通过store目录中的index子目录中的indexFile进行索引实现查询的,当Broker收到包含key的消息时这个消息索引就会被写入indexFile,如果消息没key不会写入;

IndexFile写入步骤
  1. 如果当前已使用条目大于等于允许最大条目数时,则返回false,表示当前索引文件已写满。如果当前索引文件未写满则根据key算出key的hashcode,然后keyHash对hash槽数量取余定位到hashcode对应的hash槽下标, hashcode对应的hash槽的物理地址 = IndexHeader 头部(40字节) + 下标 * 每个hash槽的大小(4字节);
  2. 读取hash槽中存储的数据,如果hash槽存储的数据小于0或大于当前索引文件中存储的最大条目,则将该槽的值设置为0;
  3. 将条目信息存储在IndexFile中;
  • 计算新添加条目的起始物理偏移量,等于头部字节长度+ hash槽数量单个hash槽大小(4个字节) + 当前Index条目个数单个Index 条目大小(20个字节);
  • 依次将hashcode、消息物理偏移量、时间差timeDif、原来Hash槽的值存入该索引条目中;
  • 将新添加的索引条目索引存入hash槽中,覆盖原来的值;
  1. 更新文件索引头信息;
mappedFile类

对于commitlog、 consumequeue、 index三类大文件进行磁盘读写操作,均是通过MapedFile类来完成。这个类相当于MappedByteBuffer的包装类;MappedFile是对一个物理文件的抽象,MappedFileQueue是一组文件的队列,MappedByteBuffer 则是一个映射到物理文件所对应的page cache的一块内存;

mappedByteBuffer

mappedByteBuffer是一块映射到CommitLog文件的内存(具体了解mmap);准确来讲,它占用了page cache的一部分,也就是说写入这里的文件可以免去 从用户空间到内核空间一次拷贝成本,这叫做 零拷贝(zero-copy) ;

逻辑存储层刷盘策略

RocketMQ 的存储与读写是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘;如果是同步刷盘,消息追加到内存后,将同步调用 MappedByteBuffer 的 force方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端;RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH (异步刷盘)、SYNC_FLUSH (同步刷盘),默认为异步刷盘; ConsumeQueue、IndexFile刷盘的实现相对于CommitLog刷盘机制来说都很简单,ConsumeQueue是周期性刷盘,索引文件的刷盘并不是采取定时刷盘机制,而是每次想要更新一次索引文件就会将之前的改动刷写到磁盘;

因为操作系统PageCache的存在,PageCache是OS对文件的缓存,用于加速对文件的读写,所以一般都是先写入到PageCache中,然后再持久化到磁盘上。我们熟悉的其他组件,MySQL、Redis等都是如此,RocketMQ也不列外。在 RocketMQ中提供了同步刷盘异步刷盘两种刷盘方式,可以通过broker.conf配置中的flushDiskType参数来设置(SYNC_FLUSHASYNC_FLUSH);

同步刷盘

在消息追加到内存映射文件的内存中后,立即将数据从内存刷写到磁盘文件,CommitLog中有一个刷盘服务 GroupCommitService,所有消息发送线程接收到的同步写入请求,最终都会以请求-回应的方式通知 GroupCommitService 代其进行刷盘操作。当 GroupCommitService 执行完刷盘任务,或者刷盘任务执行超时时,发送线程才会回复消息的 Producer;消息写入内存的PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态,这种方式可以保证数据绝对安全,但是吞吐量不大;

异步刷盘(默认)

异步刷盘根据是否开启transientStorePoolEnable机制,刷盘实现会有细微差别。如果transientStorePoolEnable为true, RocketMQ会单独申请一个与目标物理文件 (CommitLog) 同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到FileChannel中,再flush到磁盘。 如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中;

当transientStorePoolEnable为true时,会有一个CommitRealTimeService默认每隔200ms将直接内存中的数据提交到FileChannel,一次提交默认至少要包含4个页的数据,否则暂时不提交;当transientStorePoolEnable为false时,这个CommitRealTimeService实际上什么都没做;

然后是定时刷盘的逻辑,CommitLog会有一个FlushRealTimeService定时将数据刷入磁盘,默认每隔10s进行一次刷盘,和commit过程一样,刷盘阶段默认也是至少攒够4个页的脏数据才进行刷盘,当transientStorePoolEnable为true时,刷盘过程调用的是FileChannel的force,否则调用的是MappedByteBuffer的force;

消息写入到内存的PageCache中,就立刻给客户端返回写操作成功,当PageCache中的消息积累到一定的量时,触发一次写操作,将PageCache中的消息写入到磁盘中;这种方式吞吐量大,性能高,但是PageCache中的数据可能丢失,不能保证数据绝对的安全;

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三个值中的一:

代码语言:javascript
复制
brokerId=0 代表主
brokerId=1 代表从(大于 0 都代表从) 
brokerRole=SYNC_MASTER 同步复制(主从) 
brokerRole=ASYNC_MASTER 异步复制(主从) 
flushDiskType=SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 异步刷盘

配置参数及意义

文件过期删除机制

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载CommitLog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引人一种机制来删除己过期的文件; RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新;RocketMQ清除过期文件的方法是: 如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时 ,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时;RocketMQ会每隔10s调度一次清除过程,检测是否需要清除过期文件;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文件系统的结构设计
    • CommitLog
      • consumeQueue
        • indexFile
          • IndexFile写入步骤
        • mappedFile类
          • mappedByteBuffer
          • 逻辑存储层刷盘策略
            • 同步刷盘
              • 异步刷盘(默认)
              • 配置参数及意义
              • 文件过期删除机制
              相关产品与服务
              对象存储
              对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档