本文主要研究一下rocketmq的registerProducer与unregisterProducer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance { private final static long LOCK_TIMEOUT_MILLIS = 3000; private final InternalLogger log = ClientLogger.getLog(); private final ClientConfig clientConfig; private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); //...... public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; } public void unregisterProducer(final String group) { this.producerTable.remove(group); this.unregisterClientWithLock(group, null); } private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) { try { if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { this.unregisterClient(producerGroup, consumerGroup); } catch (Exception e) { log.error("unregisterClient exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } } catch (InterruptedException e) { log.warn("unregisterClientWithLock exception", e); } } private void unregisterClient(final String producerGroup, final String consumerGroup) { Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<Long, String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long, String> oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { String addr = entry1.getValue(); if (addr != null) { try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); } catch (RemotingException e) { log.error("unregister client exception from broker: " + addr, e); } catch (InterruptedException e) { log.error("unregister client exception from broker: " + addr, e); } catch (MQBrokerException e) { log.error("unregister client exception from broker: " + addr, e); } } } } } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { //...... public void start() throws MQClientException { this.start(true); } public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { //...... public void shutdown() { this.shutdown(true); } public void shutdown(final boolean shutdownFactory) { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); this.defaultAsyncSenderExecutor.shutdown(); if (shutdownFactory) { this.mQClientFactory.shutdown(); } log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; } } //...... }
MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient
原创声明,本文系作者授权云+社区发表,未经许可,不得转载。
如有侵权,请联系 yunjia_community@tencent.com 删除。
我来说两句