前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ给broker发送消息确定Commitlog的写入的位置

RocketMQ给broker发送消息确定Commitlog的写入的位置

作者头像
CBeann
发布2023-12-25 19:17:10
1150
发布2023-12-25 19:17:10
举报
文章被收录于专栏:CBeann的博客CBeann的博客

问题

有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?

文件格式概述

commitlog消息单元存储结构

commitlog中存储的是客户端发送的所有数据

在这里插入图片描述
在这里插入图片描述

ConsumeQueue消息单元存储结构

ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。

在这里插入图片描述
在这里插入图片描述

流程图

!](https://img-blog.csdnimg.cn/88343ee4ae264e889c1a0b9fce4f6f3b.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQ0JlYW5u,size_20,color_FFFFFF,t_70,g_se,x_16)
!](https://img-blog.csdnimg.cn/88343ee4ae264e889c1a0b9fce4f6f3b.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQ0JlYW5u,size_20,color_FFFFFF,t_70,g_se,x_16)

源码跟踪(broker启动流程里)

入口方法

DefaultMessageStore###load

代码语言:javascript
复制
public boolean load() {
        boolean result = true;

        try {
           //省略
           
            // 装载Commit Log
            result = result && this.commitLog.load();

           

            if (result) {
               
                //省略

                //确定Commit Log文件下一个写的位置
                this.recover(lastExitOK);

                
            }
        } catch (Exception e) {
           
        }

        

        return result;
    }

装载commitlog:把commitlog中下的文件都映射成MappedFile,方便读写

CommitLog###load

代码语言:javascript
复制
public boolean load() {
        //跟进去,调用mappedFileQueue.load方法
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }

MappedFileQueue###load方法:在该方法中把commitlog下的文件映射成MappedFile

代码语言:javascript
复制
public boolean load() {
        //window上默认的目录:C:\Users\25682\store\commitlog
        File dir = new File(this.storePath);
        //上面目录下子文件
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    //把每一个文件映射成MappedFile对象,方便读取
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                    //此时wrotePosition设置的为mappedFileSize,不准确
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

此时CommitLog下的MappedFile的wrotePosition设置为mappedFileSize,但是最后这个MappedFile的wrotePosition还不对,因此下面需要修改

确定Commitlog要写的位置

DefaultMessageStore###recover

代码语言:javascript
复制
private void recover(final boolean lastExitOK) {
        //从ConsumeQueue中获取最大的物理偏移量
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

        if (lastExitOK) {
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            //
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }

        this.recoverTopicQueueTable();
    }

DefaultMessageStore###recoverConsumeQueue:获取每一个主题里每一个队列里的最大commitlog偏移量

代码语言:javascript
复制
private long recoverConsumeQueue() {
        long maxPhysicOffset = -1;
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                logic.recover();
                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                    maxPhysicOffset = logic.getMaxPhysicOffset();
                }
            }
        }

        return maxPhysicOffset;
    }

CommitLog###recoverAbnormally

代码语言:javascript
复制
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            int index = mappedFiles.size() - 1;
            //获取最后一个CommitLog的MapperFile
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
            }

            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                //不断从MapperFile中根据CommitLog的数据单元格式读取数据,当读取到数据为0时,跳出循环,说明该位置为下个需要写的位置
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();

                if (dispatchRequest.isSuccess()) {
                    // Normal data
                    if (size > 0) {
                        mappedFileOffset += size;

                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                                this.defaultMessageStore.doDispatch(dispatchRequest);
                            }
                        } else {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    }
                   
                    else if (size == 0) {
                        index++;
                        if (index >= mappedFiles.size()) {
                            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                            break;
                        } else {
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next physics file, " + mappedFile.getFileName());
                        }
                    }
                } else {
                    log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                    break;
                }
            }
           
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
             //该位置为真正要插入的位置,所以修正上面的设置的错误的wrotePosition
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        }
        // Commitlog case files are deleted
        else {
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }

结论

CommitLog一开始是把wrotePosition设置为CommitLog文件的大小,这样只有最后一个CommitLog的wrotePosition的数据是不正确的,所以后面在确定最后一个CommitLog的wrotePosition的时候是通过读取CommitLog文件里的数据来确定wrotePosition位置的,因为CommitLog里前四个字节代表这条消息的大小,这样我读取前四个字节以后就可以读取这一条数据,然后以此类推,当读取消息的大小为0时,代表此处没有消息,则确定wrotePosition的位置。

参考

CommitLog格式 https://blog.csdn.net/meilong_whpu/article/details/76919267 ConsumeQueue格式 https://www.cnblogs.com/gaojy/p/15087869.html

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 文件格式概述
    • commitlog消息单元存储结构
      • ConsumeQueue消息单元存储结构
      • 流程图
      • 源码跟踪(broker启动流程里)
        • 入口方法
          • 装载commitlog:把commitlog中下的文件都映射成MappedFile,方便读写
            • 确定Commitlog要写的位置
            • 结论
            • 参考
            相关产品与服务
            对象存储
            对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档