首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的FlushConsumeQueueService

聊聊rocketmq的FlushConsumeQueueService

原创
作者头像
code4it
修改2019-12-23 11:20:46
7010
修改2019-12-23 11:20:46
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的FlushConsumeQueueService

FlushConsumeQueueService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

    class FlushConsumeQueueService extends ServiceThread {
        private static final int RETRY_TIMES_OVER = 3;
        private long lastFlushTimestamp = 0;
​
        private void doFlush(int retryTimes) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
​
            if (retryTimes == RETRY_TIMES_OVER) {
                flushConsumeQueueLeastPages = 0;
            }
​
            long logicsMsgTimestamp = 0;
​
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
​
            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
​
            for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                for (ConsumeQueue cq : maps.values()) {
                    boolean result = false;
                    for (int i = 0; i < retryTimes && !result; i++) {
                        result = cq.flush(flushConsumeQueueLeastPages);
                    }
                }
            }
​
            if (0 == flushConsumeQueueLeastPages) {
                if (logicsMsgTimestamp > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }
​
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");
​
            while (!this.isStopped()) {
                try {
                    int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
                    this.waitForRunning(interval);
                    this.doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
​
            this.doFlush(RETRY_TIMES_OVER);
​
            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
​
        @Override
        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }
​
        @Override
        public long getJointime() {
            return 1000 * 60;
        }
    }
  • FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

ConsumeQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

public class ConsumeQueue {
​
    //......
​
    public boolean flush(final int flushLeastPages) {
        boolean result = this.mappedFileQueue.flush(flushLeastPages);
        if (isExtReadEnable()) {
            result = result & this.consumeQueueExt.flush(flushLeastPages);
        }
​
        return result;
    }
​
    //......
}
  • flush方法执行的是mappedFileQueue.flush(flushLeastPages),如果isExtReadEnable()为true则还会执行consumeQueueExt.flush(flushLeastPages)

MappedFileQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

public class MappedFileQueue {
​
    //......
​
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
​
        return result;
    }
​
    //......
}
  • MappedFileQueue的flush方法则主要执行mappedFile.flush(flushLeastPages)

MappedFile

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFile.java

public class MappedFile extends ReferenceResource {
​
    //......
​
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
​
                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
​
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }
​
    //......
}
  • MappedFile的flush方法主要是执行fileChannel.force(false)或者mappedByteBuffer.force()

小结

FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • FlushConsumeQueueService
  • ConsumeQueue
  • MappedFileQueue
  • MappedFile
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档