前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ MappedFile 预热原理解析

RocketMQ MappedFile 预热原理解析

作者头像
java404
发布2019-01-03 11:59:51
2.8K0
发布2019-01-03 11:59:51
举报
文章被收录于专栏:java 成神之路java 成神之路

创建 MappedFile 文件

创建 MappedFile 文件实现如下:

代码语言:javascript
复制
private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        // 从 requestQueue 阻塞队列中获取 AllocateRequest  任务。
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        if (null == expectedRequest) {
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true;
        }
        if (expectedRequest != req) {
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        }

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();

            MappedFile mappedFile;
            // 判断是否开启 isTransientStorePoolEnable ,如果开启则使用直接内存进行写入数据,最后从直接内存中 commit 到 FileChannel 中。
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                // 使用 mmap 方式创建 MappedFile
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
            if (eclipseTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }
            // 判断 mappedFile 大小,只有 CommitLog 才进行文件预热
            // 预写入数据。按照系统的 pagesize 进行每个pagesize 写入一个字节数据。
            //为了把mmap 方式映射的文件都加载到内存中。
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMapedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }

            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true;
        return false;
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true;
        if (null != req) {
            requestQueue.offer(req);
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown();
    }
    return true;
}

从代码中可以看出,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且开启文件预热功能才会预加载文件。 CommitLog 文件的大小默认为 1 G。

文件预热

文件预热的时候需要了解的知识点 操作系统的 Page Cache 和 内存映射技术 mmap 。

Page Cache

Page Cache 叫做页缓存,而每一页的大小通常是4K,在Linux系统中写入数据的时候并不会直接写到硬盘上,而是会先写到Page Cache中,并打上dirty标识,由内核线程flusher定期将被打上dirty的页发送给IO调度层,最后由IO调度决定何时落地到磁盘中,而Linux一般会把还没有使用的内存全拿来给Page Cache使用。而读的过程也是类似,会先到Page Cache中寻找是否有数据,有的话直接返回,如果没有才会到磁盘中去读取并写入Page Cache然后再次读取Page Cache并返回。而且读的这个过程中操作系统也会有一个预读的操作,你的每一次读取操作系统都会帮你预读出后面一部分数据,而且当你一直在使用预读数据的时候,系统会帮你预读出更多的数据(最大到128K)。

mmap

mmap是一种将文件映射到虚拟内存的技术,可以将文件在磁盘位置的地址和在虚拟内存中的虚拟地址通过映射对应起来,之后就可以在内存这块区域进行读写数据,而不必调用系统级别的read,wirte这些函数,从而提升IO操作性能,另外一点就是mmap后的虚拟内存大小必须是内存页大小(通常是4K)的倍数,之所以这么做是为了匹配内存操作。

预热代码
代码语言:javascript
复制
public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // 如果是同步写盘操作,则进行强行刷盘操作
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }

            // prevent gc  (有什么用?)
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }

        // 把剩余的数据强制刷新到磁盘中
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);

        this.mlock();
    }

这里 MappedFile 已经创建,对应的 Buffer 为 mappedByteBuffer。 mappedByteBuffer 已经通过 mmap 映射,此时操作系统中只是记录了该文件和该 Buffer 的映射关系,而没有映射到物理内存中。这里就通过对该 MappedFile 的每个 Page Cache 进行写入一个字节,通过读写操作把 mmap 映射全部加载到物理内存中。

锁定内存 mlock()

代码语言:javascript
复制
    public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }

        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    }

该方法主要是实现文件预热后,防止把预热过的文件被操作系统调到swap空间中。当程序在次读取交换出去的数据的时候会产生缺页异常。

LibC.INSTANCE.mlock 和 LibC.INSTANCE.madvise 都是调用的 Native 方法。

  • LibC.INSTANCE.mlock 方法 实现是将锁住指定的内存区域避免被操作系统调到swap空间中。
  • LibC.INSTANCE.madvise 方法 实现是一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建 MappedFile 文件
  • 文件预热
    • Page Cache
      • mmap
        • 预热代码
        • 锁定内存 mlock()
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档