本文主要研究一下rocketmq的registerProducer与unregisterProducer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
//......
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
public void unregisterProducer(final String group) {
this.producerTable.remove(group);
this.unregisterClientWithLock(group, null);
}
private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
try {
if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
this.unregisterClient(producerGroup, consumerGroup);
} catch (Exception e) {
log.error("unregisterClient exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
} catch (InterruptedException e) {
log.warn("unregisterClientWithLock exception", e);
}
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
}
}
}
}
//......
}
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner {
//......
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
//......
}
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner {
//......
public void shutdown() {
this.shutdown(true);
}
public void shutdown(final boolean shutdownFactory) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown();
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
//......
}
MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。