专栏首页码匠的流水账聊聊rocketmq的DLedgerRoleChangeHandler
原创

聊聊rocketmq的DLedgerRoleChangeHandler

本文主要研究一下rocketmq的DLedgerRoleChangeHandler

DLedgerRoleChangeHandler

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java

public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
​
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
    private BrokerController brokerController;
    private DefaultMessageStore messageStore;
    private DLedgerCommitLog dLedgerCommitLog;
    private DLedgerServer dLegerServer;
    public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
        this.brokerController = brokerController;
        this.messageStore = messageStore;
        this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
        this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
    }
​
    @Override public void handle(long term, MemberState.Role role) {
        Runnable runnable = new Runnable() {
            @Override public void run() {
                long start = System.currentTimeMillis();
                try {
                    boolean succ = true;
                    log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                    switch (role) {
                        case CANDIDATE:
                            if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                                brokerController.changeToSlave(dLedgerCommitLog.getId());
                            }
                            break;
                        case FOLLOWER:
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                            break;
                        case LEADER:
                            while (true) {
                                if (!dLegerServer.getMemberState().isLeader()) {
                                    succ = false;
                                    break;
                                }
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                    break;
                                }
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                    && messageStore.dispatchBehindBytes() == 0) {
                                    break;
                                }
                                Thread.sleep(100);
                            }
                            if (succ) {
                                messageStore.recoverTopicQueueTable();
                                brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                            }
                            break;
                        default:
                            break;
                    }
                    log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
                } catch (Throwable t) {
                    log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
                }
            }
        };
        executorService.submit(runnable);
    }
​
    @Override public void startup() {
​
    }
​
    @Override public void shutdown() {
        executorService.shutdown();
    }
}
  • DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

changeToSlave

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {
    
    //......
​
    public void changeToSlave(int brokerId) {
        log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
​
        //change the role
        brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
        messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
​
        //handle the scheduled service
        try {
            this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
        } catch (Throwable t) {
            log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
        }
​
        //handle the transactional service
        try {
            this.shutdownProcessorByHa();
        } catch (Throwable t) {
            log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
        }
​
        //handle the slave synchronise
        handleSlaveSynchronize(BrokerRole.SLAVE);
​
        try {
            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
        } catch (Throwable ignored) {
​
        }
        log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
    }
​
    //......
}
  • changeToSlave方法主要执行messageStoreConfig.setBrokerRole(BrokerRole.SLAVE),然后执行shutdownProcessorByHa()、handleSlaveSynchronize(BrokerRole.SLAVE)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

changeToMaster

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {
    
    //......
​
    public void changeToMaster(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            return;
        }
        log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
​
        //handle the slave synchronise
        handleSlaveSynchronize(role);
​
        //handle the scheduled service
        try {
            this.messageStore.handleScheduleMessageService(role);
        } catch (Throwable t) {
            log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
        }
​
        //handle the transactional service
        try {
            this.startProcessorByHa(BrokerRole.SYNC_MASTER);
        } catch (Throwable t) {
            log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
        }
​
        //if the operations above are totally successful, we change to master
        brokerConfig.setBrokerId(0); //TO DO check
        messageStoreConfig.setBrokerRole(role);
​
        try {
            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
        } catch (Throwable ignored) {
​
        }
        log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
    }
​
    //......
}
  • changeToMaster方法执行handleSlaveSynchronize、startProcessorByHa(BrokerRole.SYNC_MASTER)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

小结

DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的DLedgerRoleChangeHandler

    本文主要研究一下rocketmq的DLedgerRoleChangeHandler

    codecraft
  • 聊聊spring security的role hierarchy

    默认情况下,userDetailsService建立的用户,他们的权限是没有继承关系的

    codecraft
  • 聊聊Garbage Collector的SATB

    Shenandoah面向low-pause-time的垃圾收集器,它的GC cycle主要有

    codecraft
  • 聊聊rocketmq的DLedgerRoleChangeHandler

    本文主要研究一下rocketmq的DLedgerRoleChangeHandler

    codecraft
  • Weex 是如何在 iOS 客户端上跑起来的

    2016年4月21日,阿里巴巴在Qcon大会上宣布跨平台移动开发工具Weex开放内测邀请。Weex能够完美兼顾性能与动态性,让移动开发者通过简捷的前端语法写出N...

    一缕殇流化隐半边冰霜
  • Apple Pay终于来了,支付安全性呢?

    大数据文摘
  • Apple Pay终于来了,安全性咋样?

    ‍‍‍ ? 2016年2月18日凌晨5:00,Apple Pay业务在中国大陆正式上线,在中国支持工行、农行、建行、中行、交行等19家银行发行的借记卡和信用卡。...

    FB客服
  • Apple Pay终于来了,支付安全性呢?

    2016年2月18日凌晨5:00,Apple Pay业务在中国大陆正式上线,在中国支持工行、农行、建行、中行、交行、邮储、招行、兴业、中信、民生、平安、光大、华...

    CDA数据分析师
  • 聊聊rocketmq的ClientManageProcessor

    rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/rem...

    codecraft
  • 聊聊rocketmq的ClientManageProcessor

    rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/rem...

    codecraft

扫码关注云+社区

领取腾讯云代金券