专栏首页码匠的流水账聊聊rocketmq的updateTopicRouteInfoFromNameServer
原创

聊聊rocketmq的updateTopicRouteInfoFromNameServer

本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer

updateTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
​
    //......
​
    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();
​
        // Consumer
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }
​
        // Producer
        {
            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQProducerInner> entry = it.next();
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    Set<String> lst = impl.getPublishTopicList();
                    topicList.addAll(lst);
                }
            }
        }
​
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
​
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
​
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
​
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
​
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
​
                            // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
​
                            // Update sub info
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
​
        return false;
    }
​
    //......
}
  • updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

getTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {
​
    //......
​
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
​
        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
    }
​
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
​
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
​
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }
​
                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }
​
        throw new MQClientException(response.getCode(), response.getRemark());
    }    
​
    //......
}
  • getTopicRouteInfoFromNameServer方法构造RequestCode.GET_ROUTEINTO_BY_TOPIC,若response.getCode为ResponseCode.SUCCESS,则使用TopicRouteData.decode(body, TopicRouteData.class)解析为TopicRouteData;这里remotingClient.invokeSync的addr参数为null

invokeSync

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
​
    //......
​
    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
​
    //......
​
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
​
    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
        if (null == addr) {
            return getAndCreateNameserverChannel();
        }
​
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
​
        return this.createChannel(addr);
    }
​
    private Channel getAndCreateNameserverChannel() throws InterruptedException {
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }
​
        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }
​
                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);
​
                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null) {
                            return channelNew;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }
​
        return null;
    }
​
    private static int initValueIndex() {
        Random r = new Random();
​
        return Math.abs(r.nextInt() % 999) % 999;
    }
​
    //......
}          
  • invokeSync首先通过getAndCreateChannel获取channel,而getAndCreateChannel方法在addr为null时执行的是getAndCreateNameserverChannel;这里取的是namesrvAddrChoosed.get(),若不为null则返回,为null的话则先从namesrvIndex.incrementAndGet()获取index,取绝对值,然后再对addrList.size()取余数作为选中的namesrv的地址,更新到namesrvAddrChoosed;namesrvIndex的初始值为initValueIndex,它通过Math.abs(r.nextInt() % 999) % 999算出一个随机初始值

小结

  • MQClientInstance的updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • futureTask的超时原理解析

    java/util/concurrent/AbstractExecutorService.java

    codecraft
  • 聊聊artemis的gracefulShutdownEnabled

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊chronos的pullFromDefaultCFAndPush

    DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService...

    codecraft
  • Mybatis之ParameterHandler

        可以看到,使用了TypeHandler的setParameter方法来设置参数,ParameterHandler的实现相对简单。

    克虏伯
  • Java开发知识之Java控制语句

      不管任何语言都有控制语句 if else if else whie do while for .... 首先讲解的是java的复合语句

    IBinary
  • 聊聊chronos的pullFromDefaultCFAndPush

    DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService...

    codecraft
  • 【笑话】程序猿才懂得幽默……第N波

    1、上完厕所正准备走,隔间传来一阵哀怨的声音:“哥们,帮帮我好吗?”我叹了口气:“不带纸就敢蹲坑,你也是蛮拼的!”他尴尬地笑了笑,说:“不是,你能不能帮我去问下...

    小莹莹
  • python简单语法2

    py3study
  • 两条报警信息的分析(第一篇) (r6笔记第70天)

    任何规则都是固定的,但是人是活的,很多时候把一些细节之处结合起来,还是能够发现一些潜在的问题。 早上收到zabbix的报警,是两条看似很平常的短信。 一封邮件内...

    jeanron100
  • 从Preact中了解React组件和hooks基本原理

    React 的代码库现在已经比较庞大了,加上 v16 的 Fiber 重构,初学者很容易陷入细节的汪洋大海,搞懂了会让人觉得自己很牛逼,搞不懂很容易让人失去信心...

    Nealyang

扫码关注云+社区

领取腾讯云代金券