专栏首页码匠的流水账聊聊rocketmq的FlushConsumeQueueService
原创

聊聊rocketmq的FlushConsumeQueueService

本文主要研究一下rocketmq的FlushConsumeQueueService

FlushConsumeQueueService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

    class FlushConsumeQueueService extends ServiceThread {
        private static final int RETRY_TIMES_OVER = 3;
        private long lastFlushTimestamp = 0;
​
        private void doFlush(int retryTimes) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
​
            if (retryTimes == RETRY_TIMES_OVER) {
                flushConsumeQueueLeastPages = 0;
            }
​
            long logicsMsgTimestamp = 0;
​
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
​
            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
​
            for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                for (ConsumeQueue cq : maps.values()) {
                    boolean result = false;
                    for (int i = 0; i < retryTimes && !result; i++) {
                        result = cq.flush(flushConsumeQueueLeastPages);
                    }
                }
            }
​
            if (0 == flushConsumeQueueLeastPages) {
                if (logicsMsgTimestamp > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }
​
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");
​
            while (!this.isStopped()) {
                try {
                    int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
                    this.waitForRunning(interval);
                    this.doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
​
            this.doFlush(RETRY_TIMES_OVER);
​
            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
​
        @Override
        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }
​
        @Override
        public long getJointime() {
            return 1000 * 60;
        }
    }
  • FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

ConsumeQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

public class ConsumeQueue {
​
    //......
​
    public boolean flush(final int flushLeastPages) {
        boolean result = this.mappedFileQueue.flush(flushLeastPages);
        if (isExtReadEnable()) {
            result = result & this.consumeQueueExt.flush(flushLeastPages);
        }
​
        return result;
    }
​
    //......
}
  • flush方法执行的是mappedFileQueue.flush(flushLeastPages),如果isExtReadEnable()为true则还会执行consumeQueueExt.flush(flushLeastPages)

MappedFileQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

public class MappedFileQueue {
​
    //......
​
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
​
        return result;
    }
​
    //......
}
  • MappedFileQueue的flush方法则主要执行mappedFile.flush(flushLeastPages)

MappedFile

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFile.java

public class MappedFile extends ReferenceResource {
​
    //......
​
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
​
                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
​
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }
​
    //......
}
  • MappedFile的flush方法主要是执行fileChannel.force(false)或者mappedByteBuffer.force()

小结

FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的FlushConsumeQueueService

    本文主要研究一下rocketmq的FlushConsumeQueueService

    codecraft
  • 聊聊rocketmq的PushConsumerImpl

    io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

    codecraft
  • 聊聊rocketmq的ScheduleMessageService

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的FlushConsumeQueueService

    本文主要研究一下rocketmq的FlushConsumeQueueService

    codecraft
  • 一个全能的挖孔 Shader

    TouchBlocker 是用来限制可点击的节点的独立组件,完整文件在 eazax-ccc/component 目录下。

    白玉无冰
  • Button按钮--inject与provide

    inject 和 provider 是vue中的组合选项,需要一起使用。目的是允许一个祖先组件向其所有子孙后代注入依赖(简单地说就是祖先组件向子孙后代传值的一种...

    用户1148399
  • 【译】理解JavaScript中的This,Bind,Call和Apply

    this关键词在JavaScript中是个很重要的概念,也是一个对初学者和学习其他语言的人来说晦涩难懂。在JavaScript中,this是一个对象的引用。th...

    嘉明
  • 【JS游戏编程基础】关于js里的this关键字的理解

    this关键字在c++,java中都提供了这个关键字,在刚开始学习时觉得有难度,但是只要理解了,用起来就方便多了,下面通过本篇文章给大家详解js里this关键字...

    李海彬
  • RocketMQ 源码分析 —— 高可用

    本文主要解析 Namesrv、Broker 如何实现高可用,Producer、Consumer 怎么与它们通信保证高可用。

    芋道源码
  • js实现单张或多张图片持续无缝滚动

    想要实现图片持续滚动,既然使用js,就千万不要加css动画、过渡等相关样式,如果想要滚动的平滑一下,可以一像素一像素的感动,则很平滑,如果加了过渡动画,当图片重...

    蓓蕾心晴

扫码关注云+社区

领取腾讯云代金券