前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的HAClient

聊聊rocketmq的HAClient

原创
作者头像
code4it
修改2019-12-13 10:13:27
5340
修改2019-12-13 10:13:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的HAClient

HAClient

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
        private final AtomicReference<String> masterAddress = new AtomicReference<>();
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private SocketChannel socketChannel;
        private Selector selector;
        private long lastWriteTimestamp = System.currentTimeMillis();
​
        private long currentReportedOffset = 0;
        private int dispatchPosition = 0;
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
​
        public HAClient() throws IOException {
            this.selector = RemotingUtil.openSelector();
        }
​
        //......
​
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
​
            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {
​
                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }
​
                        this.selector.select(1000);
​
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
​
                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
​
                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }
​
            log.info(this.getServiceName() + " service end");
        }
​
        @Override
        public String getServiceName() {
            return HAClient.class.getSimpleName();
        }
​
        //......
​
        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }
​
            return true;
        }
​
        //......        
    }
  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

dispatchReadRequest

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java

    class HAClient extends ServiceThread {
​
        //......
​
        private boolean dispatchReadRequest() {
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();
​
            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                if (diff >= msgHeaderSize) {
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
​
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
​
                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
​
                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                        this.byteBufferRead.get(bodyData);
​
                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
​
                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPosition += msgHeaderSize + bodySize;
​
                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }
​
                        continue;
                    }
                }
​
                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }
​
                break;
            }
​
            return true;
        }
​
        //......
    }
  • dispatchReadRequest会判断diff >= (msgHeaderSize + bodySize),若成立则执行defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData)

小结

  • HAClient继承了ServiceThread,其run方法以isStopped为false进行while循环,之后通过connectMaster方法判断是否连上masterAddress,连不上则执行waitForRunning(1000 * 5);连上了master之后再判断isTimeToReportOffset,即判断当前时间与lastWriteTimestamp的差值,若该值大于defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),则返回true;最后执行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下会执行dispatchReadRequest

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HAClient
  • dispatchReadRequest
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档