专栏首页java 成神之路RocketMQ 同步刷盘实现原理

RocketMQ 同步刷盘实现原理

未写完,待续。。。。

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // 判断该 broke 是否设置同步刷盘: flushDiskType = SYNC_FLUSH
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 获取刷盘服务类。该服务类有三种实现方式。
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            //封装刷盘请求对象
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //添加刷盘请求(后台定时任务进行刷盘,每隔10毫秒批量刷盘。10毫秒中如果有多个请求,则多个请求一块刷盘)
            service.putRequest(request);
            //等待刷盘请求结果(最长等待5秒钟,刷盘成功后马上可以获取结果。)
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // 异步刷新,异步刷新有两种方式进行刷新
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

刷盘服务类有三种

  • 同步刷盘时使用 GroupCommitService
  • 异步刷盘时使用 FlushRealTimeService
  • 如果开启 isTransientStorePoolEnable 则同时也使用 CommitRealTimeService。

CommitRealTimeService 只有当前是异步刷盘策略、broker是master 、并且开启 transientStorePoolEnable 才可以启动 CommitRealTimeService 刷盘策略。 CommitRealTimeService 刷盘策略和 FlushRealTimeService 刷盘策略是同时运行的

这里先介绍下同步刷盘策略

同步刷盘策略

class GroupCommitService extends FlushCommitLogService {
    // 存储读请求和写请求
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
    // put 请求
    public synchronized void putRequest(final GroupCommitRequest request) {
        synchronized (this.requestsWrite) {
            this.requestsWrite.add(request);
        }
        // 省略代码
        ......
    }
    // 省略代码
    ......
    private void doCommit() {
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    boolean flushOK = false;
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                        if (!flushOK) {
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                    }

                    req.wakeupCustomer(flushOK);
                }

                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }

                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                this.waitForRunning(10);
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }

        synchronized (this) {
            this.swapRequests();
        }

        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    protected void onWaitEnd() {
        this.swapRequests();
    }

    @Override
    public String getServiceName() {
        return GroupCommitService.class.getSimpleName();
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ 生产者 Producer 启动过程

    从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。

    java404
  • RocketMQ MappedFile 预热原理解析

    从代码中可以看出,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且开启文件预热功能才会预加载文件。 CommitLog 文件的大小...

    java404
  • IO、NIO、AIO 内部原理分析

    java404
  • 实现一个Promise之基础、异步

    其实跟着promise a+一步一步,按照顺序实现一个promise并不难,今天先实现最简单的promise和异步resolve。本来想全部一次性写完,想想还是...

    wade
  • react学习:React状态

    爱明依
  • Phaser.js之简单的跑酷游戏

    源码(详细源码图片资源可点击文章下方或屏幕右上方的github链接进行clone)

    ProsperLee
  • Kotlin---标准扩展函数

    除了自定义扩展之外,Kotlin中也定义了很多的扩展函数,而这些扩展函数的接收类型是范型,也就是所有对象都可以使用。这些标准的扩展函数都放在了Standard....

    None_Ling
  • Flutter第5天--布局实例+操作交互

    张风捷特烈
  • JavaScript设计模式--简单工厂模式

    工厂模式定义一个用于创建对象的接口,这个接口由子类决定实例化哪一个类。该模式使一个类的实例化延迟到了子类。而子类可以重写接口方法以便创建的时候指定自己的对象类型...

    wfaceboss
  • Vue:父子组件信息传递

    MrTreasure

扫码关注云+社区

领取腾讯云代金券