首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的updateConsumeOffsetToBroker

聊聊rocketmq的updateConsumeOffsetToBroker

原创
作者头像
code4it
修改2019-12-04 10:07:11
1.1K0
修改2019-12-04 10:07:11
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的updateConsumeOffsetToBroker

updateConsumeOffsetToBroker

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java

public class RemoteBrokerOffsetStore implements OffsetStore {
​
    //......
​
    /**
     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
     * here need to be optimized.
     */
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
​
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }
​
        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);
​
            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }
​
    //......
}
  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

findBrokerAddressInAdmin

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
​
    //......
​
    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
​
        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<Long, String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if (brokerAddr != null) {
                    found = true;
                    if (MixAll.MASTER_ID == id) {
                        slave = false;
                    } else {
                        slave = true;
                    }
                    break;
​
                }
            } // end of for
        }
​
        if (found) {
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }
​
        return null;
    }
​
    //......
}
  • findBrokerAddressInAdmin方法首先从brokerAddrTable获取指定brokerName的brokerId及address的map,然后遍历map,对于brokerAddr不为null的标记found为true,标记brokerId为MixAll.MASTER_ID的slave为false,否则为true,最后跳出循环;若found为true则构造FindBrokerResult返回,否则返回null

小结

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • updateConsumeOffsetToBroker
  • findBrokerAddressInAdmin
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档