前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >从源码剖析RocketMQ如何高效且持久化处理消息

从源码剖析RocketMQ如何高效且持久化处理消息

原创
作者头像
潋湄
修改2025-01-10 20:35:46
修改2025-01-10 20:35:46
1670
举报
文章被收录于专栏:消息队列

RocketMQ作为消息队列的典型代表,其在高并发状况下处理消息又很不错的性能,同时又能够通过将消息持久化到磁盘确保消息不会丢失,本文旨在从RocketMQ的源码剖析为何它能够高效处理消息,并且又如何高效组织消息并写入磁盘

在阅读本文之前,建议不了解RocketMQ基础架构的小伙伴先看一下这篇文章了解一下:

https://cloud.tencent.com/developer/article/2485324

消息Message存储的地点——CommitLog

在RocketMQ中,消息Message投递到Broker后,会存储到CommitLog中,然后通过CommitLog存储到磁盘中,而CommitLog最后会被持久化到磁盘中,因此也就确保了即使写入消息的时候机器发生宕机,其中的消息也不会丢失:

Broker将消息存储到commit log中
Broker将消息存储到commit log中

而最终存储到磁盘中的commit log是以文件偏移量命名,那么Broker能做到持久化处理消息,又是如何高效地对消息进行读取呢?

如果你对于MySQL存储数据的流程比较清晰,应该能够联想到,MySQL在对数据进行存取时,并不会直接将数据存到磁盘上,而是先写入到缓冲区中,之后将缓冲区的数据持久化到磁盘的日志中,与MySQL类似,RocketMQ在对消息写入时,也引入了内存来高效处理消息,不过这里应用到的是mmap技术,将磁盘中文件对应页表的物理地址以虚拟地址映射到内存中,这样能够使得系统在获取数据时直接拿到数据在磁盘的物理地址,另外还会开启一个专门将内存数据持久化到磁盘的服务,从而完成对于消息的持久化操作

接下来就让我们从源码剖析,RocketMQ是如何通过将消息存储到CommitLog中,并且建立了与内存之间的地址映射

Broker接收到Message进行处理

Broker在接收到Message后,会为要写入的CommitLog生成对应偏移量的文件名,那么这个偏移量是如何生成的呢?让我们一步步探究:

Processor处理并校验参数

Broker在初始化时,会初始化各个Processor来完成对于消息的接收以及处理消息等功能,而其中负责接收消息的Processor为SendMessageProcesser

代码语言:java
复制
    /**
     * 包名: broker
     * 类名: BrokerController
     * 方法名: registerProcessor
     * 行数:545
     */
    public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

在这个类中有很多接收Message的异步处理方法:

代码语言:java
复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    private List<ConsumeMessageHook> consumeMessageHookList;

    public SendMessageProcessor(final BrokerController brokerController) {
        super(brokerController);
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request);

    @Override
    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback);

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request)  {
          ......
          return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
          ......
                                                                  }

Broker会通过asyncProcessRequest在其内部异步处理消息,调用asyncSendMessage来调用MessageStore存取消息,在调用之前,它会进行一系列的参数校验:

代码语言:java
复制
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        ......
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }
        ......
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        ......
}

MessageStore写入数据

MessageStore是存取消息的核心类,它是Broker启动时第一个启动的Processor类,它其中最主要处理消息的方法就是asyncPutMessage

代码语言:java
复制
类名:DefaultMessageStore
@Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        ......
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
        ......
    }

因此,消息存取通过层层封装,消息被Broker处理的逻辑大致如下所示:

Broker接收消息逻辑
Broker接收消息逻辑

我们可以看到,在这个方法中,Message会移交给commitLog来完成最终的磁盘存储,但是,如果我们深究commitLog底层存储消息的代码,我们可以看到真正写入数据的地方在这里:

代码语言:java
复制
类名:CommitLog
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    ......
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    this.beginTimeInLock = beginLockTimestamp;

    // Here settings are stored timestamp, in order to ensure an orderly
    // global
    msg.setStoreTimestamp(beginLockTimestamp);

    if (null == mappedFile || mappedFile.isFull()) {
        mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    }
    ......
}

我们发现,这里有多了一个操作消息数据的类MappedFile,也许你会感到困惑,这又是什么类,没关系,接下来我们介绍一下这个类:

CommitLog的文件组织形式——MappedFile

MappedFile介绍

我们知道CommitLog在磁盘上存储文件名时文件对应的偏移量,而MappedFile就可以理解为对一个个磁盘文件的封装,它是CommitLog存储到磁盘上的文件单元形式,也就是说,一份CommitLog中可以包含多个MappedFile,而如果仅仅包含多个MappedFile不好管理,因此在RocketMQ中单独定义了MappedFileQueue来管理一个CommitLog包含的多个MappedFile,而在MappedFileQueue中,则是很简便地用了一个List来存储MappedFile

代码语言:java
复制
public class CommitLog {
    protected final MappedFileQueue mappedFileQueue;
}

public class MappedFileQueue {
    protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
}

通过这样的组织方式,便能够存储多个MappedFile的同时很好地管理这些存储文件,而我们再看上面初始化mappedFile时调用的getLastMappedFile方法,也能够很好地理解,它其实就是去除当前列表中的最后一个元素:

代码语言:java
复制
    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }

        return mappedFileLast;
    }

MappedFile的初始化命名

在Broker初始化的时候,此时MappedFile并不会立即创建,而是如我们看到的,在调用getLastMappedFile方法时,从保存了MappedFile的列表中取出对象,但是如果此时列表中没有数据,那么此时是如何取出对象的呢?

我们细究源码可以发现还有另外一个getLastMappedFile负责返回一个对象文件:

代码语言:java
复制
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {
            return tryCreateMappedFile(createOffset);
        }

        return mappedFileLast;
    }

在这个方法中,如果MappedFile列表为空或者存储的文件已满,就会调用tryCreateMappedFile方法来初始化一个文件对象进行返回:

代码语言:java
复制
protected MappedFile tryCreateMappedFile(long createOffset) {
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
          + this.mappedFileSize);
    return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

我们可以发现,最终创建文件还是通过doCreateMappedFile方法执行,在这个方法中,会传递下一个文件路径以及下下个文件路径,为什么在这里要传递到下下个文件路径呢?我们再往下追源码:

代码语言:java
复制
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;

    if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
    } else {
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }

    if (mappedFile != null) {
        if (this.mappedFiles.isEmpty()) {
            mappedFile.setFirstCreateInQueue(true);
        }
        this.mappedFiles.add(mappedFile);
    }

    return mappedFile;
}

我们看到,在这个方法中,如果当前创建文件的服务allocateMappedFileService不为空,会继续调用服务的方法创建文件,该方法的主要作用就是创建了两个文件,一个是nextFilePath对应的当前要使用的文件,另一个就是nextNextFilePath对应的下一个文件,之所以创建两个文件的目的就在于:

如果当前的文件对象写满了,可以直接通过列表进行获取切换到下一个文件对象继续写入数据,从而减少再次调用服务创建文件对象的开销

到了这里,创建MappedFile并将数据写入CommitLog就结束了,接下来让我们看看·CommitLog`最终是如何写入磁盘的

将CommitLog写入磁盘

我们看完上面的创建MappedFile代码知道,创建文件并完成落盘的核心代码就在putRequestAndReturnMappedFile中,而我们追到这个方法里,可以看到这样的代码:

代码语言:java
复制
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        ......
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        boolean offerOK = this.requestQueue.offer(nextReq);
        ......
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        boolean offerOK = this.requestQueue.offer(nextNextReq);
        ......
}

你看到这里可能会疑惑,不是要将文件持久化到磁盘吗,怎么一直在向requestTable与·requestQueue中放入数据,没错,这段代码写了一百多行,其实就在向请求表请求队列中放入新创建的文件信息,那落盘代码在哪里呢?

通过内存队列创建文件

我们可以看到,在上面的创建文件方法中,它仅仅是将创建文件的请求以及实际文件名放入了请求队列与请求表中,而这些数据是暂时保存在内存队列中的,而真正处理这些请求完成创建文件操作的核心,我们还是要提及前面介绍的存取文件的核心:MessageStore

MessageStore初始化启动时,会初始化一个服务allocateMappedFileService,这个服务见名知意,它就是主要负责创建MappedFile的类,而在调用this.allocateMappedFileService.start()启动后,start方法会执行下面的操作:

代码语言:java
复制
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped() && this.mmapOperation()) {

    }
    log.info(this.getServiceName() + " service end");
}

在这个方法中,会调用一个核心函数mmapOperation,这个函数会将存到请求队列中的请求取出进行处理:

代码语言:java
复制
private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
    ......
}

我们可以看到,这个方法会取出请求队列中的请求进行消费,那么此时我们思考一下,既然这里的allocateMappedFileService是消费者,那么真正的生产者是谁?

到这里我们再回看一下源码:

代码语言:java
复制
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);

我们可以看到,通过调用本类中实例化的allocateMappedFileService对象来将创建文件的请求放入请求队列中,因此,这个内存队列的生产者消费者其实allocateMappedFileService自己!

自产自销的allocateMappedFileService
自产自销的allocateMappedFileService

那么,创建文件的关键函数在哪里呢,我们继续往下看:

创建文件映射的关键对象FileChannel

注意这里,我并没有说FileChannel创建文件的核心对象,它只是建立了一个文件映射,后续向CommitLog中写入文件是依靠它来进行,我们继续上面的代码可以看到,mappedFile通过init函数完成初始化:

代码语言:java
复制
private boolean mmapOperation() {
    ......
    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
......
}

我们继续看init方法:

代码语言:java
复制
private void init(final String fileName, final int fileSize) throws IOException {
    ......
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    ......
}

我们可以发现,在这里会初始化fileChannel对象,之后又通过fileChannel方法将mappedByteBuffer进行map映射,这里就是借鉴了mmap思想建立了一个磁盘文件页表的物理地址内存虚拟地址的直接映射,可以通过建立这种映射提高对于磁盘文件的操作效率,减少从内核态到用户态的转换次数,这也是RocketMQ具有高性能的一个底层原因之一,而我们熟知的高性能消费队列Kafka也是通过mappedByteBuffer直接建立磁盘文件与内存的映射,从而提高处理消息性能

将CommitLog持久化到磁盘(最终版)

至此,我们完成了文件命名文件内存关系映射参数校验等等一系列工作,开始将commitLog中的内容写入磁盘,因为我们通过mappedByteBuffer建立了内存地址与磁盘文件的映射关系,所以与MySQL的写入数据思想类似,直接将内存中的数据持久化到磁盘中即可,首先我们来看将消息数据写入缓存中的操作,在Broker内部处理消息的asyncPutMessage方法中,会调用appendMessage将文件追加到缓存中:

代码语言:java
复制
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { 
    ......
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    ......
}

appendMessage的具体执行逻辑如下:

代码语言:java
复制
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

在这里会有许多逻辑判断,但是最终目的就是将文件写入到缓存中,而将缓存持久化到磁盘的服务则是由FlushRealTimeService来完成:

代码语言:java
复制
class FlushRealTimeService extends FlushCommitLogService {
    public void run() {
        ......
        CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
        ......
    }
}

这个对象会定期每500ms将4页的数据刷回磁盘中,而这个任务又会交给mappedFileQueue来完成,它执行的逻辑如下:

代码语言:java
复制
public boolean flush(final int flushLeastPages) {
    boolean result = true;
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

文件写入磁盘的偏移量即mappedFile的文件名,最终就完成了写入磁盘

综上所述,其实将消息数据写入磁盘的大致流程如下:

将消息数据写入磁盘的大致过程
将消息数据写入磁盘的大致过程

而写入过程经过的对象可以表示如下

CommitLogMappedFileQueueMappedFilemappedByteBuffermmap磁盘

如果你能看到这里,想必应该收获很多吧,不得不说,虽然RocketMQ写入消息的流程复杂,但是终究还是抓住了mmap映射物理数据页表地址的高效操作方法,与通过将缓存持久化磁盘的持久化方法,所以只要把其核心思想与原理把握住,其他就是通过一些数据结构组织来实现的问题了

希望对你有所帮助!!!另外,创作不易,如果能给个赞就更好了!!!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息Message存储的地点——CommitLog
  • Broker接收到Message进行处理
    • Processor处理并校验参数
      • MessageStore写入数据
      • CommitLog的文件组织形式——MappedFile
        • MappedFile介绍
          • MappedFile的初始化命名
          • 将CommitLog写入磁盘
            • 通过内存队列创建文件
              • 创建文件映射的关键对象FileChannel
                • 将CommitLog持久化到磁盘(最终版)
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档