专栏首页码匠的流水账聊聊rocketmq的registerProducer与unregisterProducer
原创

聊聊rocketmq的registerProducer与unregisterProducer

本文主要研究一下rocketmq的registerProducer与unregisterProducer

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
​
    //......
​
    public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
​
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }
​
        return true;
    }
​
    public void unregisterProducer(final String group) {
        this.producerTable.remove(group);
        this.unregisterClientWithLock(group, null);
    }
​
    private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
        try {
            if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    this.unregisterClient(producerGroup, consumerGroup);
                } catch (Exception e) {
                    log.error("unregisterClient exception", e);
                } finally {
                    this.lockHeartbeat.unlock();
                }
            } else {
                log.warn("lock heartBeat, but failed.");
            }
        } catch (InterruptedException e) {
            log.warn("unregisterClientWithLock exception", e);
        }
    }
​
    private void unregisterClient(final String producerGroup, final String consumerGroup) {
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
​
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    String addr = entry1.getValue();
                    if (addr != null) {
                        try {
                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
                            log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                        } catch (RemotingException e) {
                            log.error("unregister client exception from broker: " + addr, e);
                        } catch (InterruptedException e) {
                            log.error("unregister client exception from broker: " + addr, e);
                        } catch (MQBrokerException e) {
                            log.error("unregister client exception from broker: " + addr, e);
                        }
                    }
                }
            }
        }
    }
​
    //......
}
  • MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

DefaultMQProducerImpl.start

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
​
    //......
​
    public void start() throws MQClientException {
        this.start(true);
    }
​
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
​
                this.checkConfig();
​
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
​
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
​
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
​
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
​
                if (startFactory) {
                    mQClientFactory.start();
                }
​
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
​
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }
​
    //......
}
  • DefaultMQProducerImpl的start方法在serviceState为CREATE_JUST时会执行mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this),如果返回false则抛出MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null)

DefaultMQProducerImpl.shutdown

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
​
    //......
​
    public void shutdown() {
        this.shutdown(true);
    }
​
    public void shutdown(final boolean shutdownFactory) {
        switch (this.serviceState) {
            case CREATE_JUST:
                break;
            case RUNNING:
                this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
                this.defaultAsyncSenderExecutor.shutdown();
                if (shutdownFactory) {
                    this.mQClientFactory.shutdown();
                }
​
                log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            case SHUTDOWN_ALREADY:
                break;
            default:
                break;
        }
    }
​
    //......
}
  • DefaultMQProducerImpl的shutdown方法在serviceState为RUNNING的时候会执行mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup())

小结

MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的registerConsumer与unregisterConsumer

    本文主要研究一下rocketmq的registerConsumer与unregisterConsumer

    codecraft
  • 聊聊rocketmq的ScheduleMessageService

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的ScheduleMessageService

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的registerConsumer与unregisterConsumer

    本文主要研究一下rocketmq的registerConsumer与unregisterConsumer

    codecraft
  • 【前端】Javascript高级篇-tab切换栏(案例1)

    为节约时间选择性实现,后续大家感兴趣可以看教程了解跟进 https://www.bilibili.com/video/BV1YJ411L75N?p=14

    瑞新
  • Flutter基础widgets教程-Chip篇

    青年码农
  • Canvas 绘制点线相交

    无论是Canvas 还是别的,前端的学习总是妙趣横生,只要思想在不断进步,技术就会一次次的突破。如果你学习过Canvas ,你会更加了解这段代码的神奇,送给你,...

    我不是费圆
  • 分享HTML5-CANVAS相交线动画代码实例

    今天全百科网分享的是HTML5-CANVAS相交线动画代码实例,史基于html、css、js三个方面制作而成,可用于网页背景,效果很是不错。

    于飞云计算
  • 由表单提交引伸的对JS设计模式的思考

    版权声明:本文为博主原创文章,未经博主允许不得转载。 ...

    j_bleach
  • 安排上了!PC人脸识别登录,出乎意料的简单

    之前不是做了个开源项目嘛,在做完GitHub登录后,想着再显得有逼格一点,说要再加个人脸识别登录,就我这佛系的开发进度,过了一周总算是抽时间安排上了。

    程序员内点事

扫码关注云+社区

领取腾讯云代金券