专栏首页码匠的流水账聊聊rocketmq的FlushConsumeQueueService

聊聊rocketmq的FlushConsumeQueueService

本文主要研究一下rocketmq的FlushConsumeQueueService

FlushConsumeQueueService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

    class FlushConsumeQueueService extends ServiceThread {
        private static final int RETRY_TIMES_OVER = 3;
        private long lastFlushTimestamp = 0;

        private void doFlush(int retryTimes) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();

            if (retryTimes == RETRY_TIMES_OVER) {
                flushConsumeQueueLeastPages = 0;
            }

            long logicsMsgTimestamp = 0;

            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }

            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

            for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                for (ConsumeQueue cq : maps.values()) {
                    boolean result = false;
                    for (int i = 0; i < retryTimes && !result; i++) {
                        result = cq.flush(flushConsumeQueueLeastPages);
                    }
                }
            }

            if (0 == flushConsumeQueueLeastPages) {
                if (logicsMsgTimestamp > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }

        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
                    this.waitForRunning(interval);
                    this.doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            this.doFlush(RETRY_TIMES_OVER);

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }

        @Override
        public long getJointime() {
            return 1000 * 60;
        }
    }
  • FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

ConsumeQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

public class ConsumeQueue {

    //......

    public boolean flush(final int flushLeastPages) {
        boolean result = this.mappedFileQueue.flush(flushLeastPages);
        if (isExtReadEnable()) {
            result = result & this.consumeQueueExt.flush(flushLeastPages);
        }

        return result;
    }

    //......
}
  • flush方法执行的是mappedFileQueue.flush(flushLeastPages),如果isExtReadEnable()为true则还会执行consumeQueueExt.flush(flushLeastPages)

MappedFileQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

public class MappedFileQueue {

    //......

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

    //......
}
  • MappedFileQueue的flush方法则主要执行mappedFile.flush(flushLeastPages)

MappedFile

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFile.java

public class MappedFile extends ReferenceResource {

    //......

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

    //......
}
  • MappedFile的flush方法主要是执行fileChannel.force(false)或者mappedByteBuffer.force()

小结

FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

doc

  • DefaultMessageStore

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-12-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的FlushConsumeQueueService

    本文主要研究一下rocketmq的FlushConsumeQueueService

    codecraft
  • 聊聊rocketmq的PushConsumerImpl

    io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

    codecraft
  • 聊聊rocketmq的ScheduleMessageService

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的FlushConsumeQueueService

    本文主要研究一下rocketmq的FlushConsumeQueueService

    codecraft
  • Button按钮--inject与provide

    inject 和 provider 是vue中的组合选项,需要一起使用。目的是允许一个祖先组件向其所有子孙后代注入依赖(简单地说就是祖先组件向子孙后代传值的一种...

    用户1148399
  • 一个全能的挖孔 Shader

    TouchBlocker 是用来限制可点击的节点的独立组件,完整文件在 eazax-ccc/component 目录下。

    白玉无冰
  • 【译】理解JavaScript中的This,Bind,Call和Apply

    this关键词在JavaScript中是个很重要的概念,也是一个对初学者和学习其他语言的人来说晦涩难懂。在JavaScript中,this是一个对象的引用。th...

    嘉明
  • js实现图片3D轮播效果(收藏)

    何处锦绣不灰堆
  • Mybatis 源码分析(二)之 Mybatis 操作数据库的流程

    Mybatis系列: Mybatis 基础介绍与逆向工程的构建 :https://www.jianshu.com/p/1c18db4d7a38 Mybati...

    zoro
  • RocketMQ 源码分析 —— 高可用

    本文主要解析 Namesrv、Broker 如何实现高可用,Producer、Consumer 怎么与它们通信保证高可用。

    芋道源码

扫码关注云+社区

领取腾讯云代金券