Broker上的Topic上的消息都会顺序的写入到commitlog文件下,然后再异步转存到consumequeue以及indexFile文件;该消息的元信息存储着消息所在的Topic与Queue,当消费者要进行消费时,会通过ConsumerQueue文件来找到自己想要消费的队列;该队列不存储具体的消息,而是存储消息的基本信息与偏移量。消费者通过偏移量去CommitLog中找到自己需要消费的信息然后取出,就可以进行消费;并且Broker还可以对CommitLog来建立Hash索引文件IndexFile,这样就可以通过消息的key来找到消息;
RocketMQ在为每个Topic在~/store/consumeQueue中创建一个目录,目录名称就是Topic的名称,在该Topic下会为每个Queue创建独立的目录,目录名为QueueId,每个目录中存放着若干的consumeQueue文件,consumeQueue是commitlog的索引文件,可以通过consumeQueue定位到commitlog的具体消息;
RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在CommitLog文件中,试想一下如果消息消费者直接从消息存储文件(CommitLog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ为了适应消息消费的检索需求,设计了消息消费队列文件(ConsumeQueue),该文件可以看成是CommitLog关于消息消费的“索引”文件,消息主题,第二级目录为主题的消息队列;
除了通过正常的指定Topic进行消息消费外,RocketMQ还提供了一种根据key进行消息查询的功能,该查询通过store目录中的index子目录中的indexFile进行索引实现查询的,当Broker收到包含key的消息时这个消息索引就会被写入indexFile,如果消息没key不会写入;
对于commitlog、 consumequeue、 index三类大文件进行磁盘读写操作,均是通过MapedFile类来完成。这个类相当于MappedByteBuffer的包装类;MappedFile是对一个物理文件的抽象,MappedFileQueue是一组文件的队列,MappedByteBuffer 则是一个映射到物理文件所对应的page cache的一块内存;
mappedByteBuffer是一块映射到CommitLog文件的内存(具体了解mmap);准确来讲,它占用了page cache的一部分,也就是说写入这里的文件可以免去 从用户空间到内核空间一次拷贝成本,这叫做 零拷贝(zero-copy) ;
RocketMQ 的存储与读写是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘;如果是同步刷盘,消息追加到内存后,将同步调用 MappedByteBuffer 的 force方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端;RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH (异步刷盘)、SYNC_FLUSH (同步刷盘),默认为异步刷盘; ConsumeQueue、IndexFile刷盘的实现相对于CommitLog刷盘机制来说都很简单,ConsumeQueue是周期性刷盘,索引文件的刷盘并不是采取定时刷盘机制,而是每次想要更新一次索引文件就会将之前的改动刷写到磁盘;
因为操作系统PageCache的存在,PageCache是OS对文件的缓存,用于加速对文件的读写,所以一般都是先写入到PageCache中,然后再持久化到磁盘上。我们熟悉的其他组件,MySQL、Redis等都是如此,RocketMQ也不列外。在 RocketMQ中提供了同步刷盘和异步刷盘两种刷盘方式,可以通过broker.conf配置中的flushDiskType参数来设置(SYNC_FLUSH、ASYNC_FLUSH);
在消息追加到内存映射文件的内存中后,立即将数据从内存刷写到磁盘文件,CommitLog中有一个刷盘服务 GroupCommitService,所有消息发送线程接收到的同步写入请求,最终都会以请求-回应的方式通知 GroupCommitService 代其进行刷盘操作。当 GroupCommitService 执行完刷盘任务,或者刷盘任务执行超时时,发送线程才会回复消息的 Producer;消息写入内存的PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态,这种方式可以保证数据绝对安全,但是吞吐量不大;
异步刷盘根据是否开启transientStorePoolEnable机制,刷盘实现会有细微差别。如果transientStorePoolEnable为true, RocketMQ会单独申请一个与目标物理文件 (CommitLog) 同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到FileChannel中,再flush到磁盘。 如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中;
当transientStorePoolEnable为true时,会有一个CommitRealTimeService默认每隔200ms将直接内存中的数据提交到FileChannel,一次提交默认至少要包含4个页的数据,否则暂时不提交;当transientStorePoolEnable为false时,这个CommitRealTimeService实际上什么都没做;
然后是定时刷盘的逻辑,CommitLog会有一个FlushRealTimeService定时将数据刷入磁盘,默认每隔10s进行一次刷盘,和commit过程一样,刷盘阶段默认也是至少攒够4个页的脏数据才进行刷盘,当transientStorePoolEnable为true时,刷盘过程调用的是FileChannel的force,否则调用的是MappedByteBuffer的force;
消息写入到内存的PageCache中,就立刻给客户端返回写操作成功,当PageCache中的消息积累到一定的量时,触发一次写操作,将PageCache中的消息写入到磁盘中;这种方式吞吐量大,性能高,但是PageCache中的数据可能丢失,不能保证数据绝对的安全;
同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三个值中的一:
brokerId=0 代表主
brokerId=1 代表从(大于 0 都代表从)
brokerRole=SYNC_MASTER 同步复制(主从)
brokerRole=ASYNC_MASTER 异步复制(主从)
flushDiskType=SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH 异步刷盘
由于RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载CommitLog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引人一种机制来删除己过期的文件; RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新;RocketMQ清除过期文件的方法是: 如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时 ,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时;RocketMQ会每隔10s调度一次清除过程,检测是否需要清除过期文件;