专栏首页码匠的流水账聊聊rocketmq的getOrCreateMQClientInstance
原创

聊聊rocketmq的getOrCreateMQClientInstance

本文主要研究一下rocketmq的getOrCreateMQClientInstance

getOrCreateMQClientInstance

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientManager.java

public class MQClientManager {
    private final static InternalLogger log = ClientLogger.getLog();
    private static MQClientManager instance = new MQClientManager();
    private AtomicInteger factoryIndexGenerator = new AtomicInteger();
    private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
        new ConcurrentHashMap<String, MQClientInstance>();
​
    private MQClientManager() {
​
    }
​
    public static MQClientManager getInstance() {
        return instance;
    }
​
    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
        return getOrCreateMQClientInstance(clientConfig, null);
    }
​
    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
​
        return instance;
    }
​
    public void removeClientFactory(final String clientId) {
        this.factoryTable.remove(clientId);
    }
}
  • MQClientManager提供了getOrCreateMQClientInstance方法用于根据clientConfig及rpcHook来创建MQClientInstance;它使用factoryTable来存储clientId与MQClientInstance的映射关系,只要clientId是一样的,获取的就是相同的MQClientInstance;而clientId则由clientConfig.buildMQClientId()计算出来

ClientConfig

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/ClientConfig.java

public class ClientConfig {
    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
    private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
    private String clientIP = RemotingUtil.getLocalAddress();
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    protected String namespace;
    protected AccessChannel accessChannel = AccessChannel.LOCAL;
​
    /**
     * Pulling topic information interval from the named server
     */
    private int pollNameServerInterval = 1000 * 30;
    /**
     * Heartbeat interval in microseconds with message broker
     */
    private int heartbeatBrokerInterval = 1000 * 30;
    /**
     * Offset persistent interval for consumer
     */
    private int persistConsumerOffsetInterval = 1000 * 5;
    private long pullTimeDelayMillsWhenException = 1000;
    private boolean unitMode = false;
    private String unitName;
    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
​
    private boolean useTLS = TlsSystemConfig.tlsEnable;
​
    private LanguageCode language = LanguageCode.JAVA;
​
    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
​
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
​
        return sb.toString();
    }
​
    public String getClientIP() {
        return clientIP;
    }
​
    public String getInstanceName() {
        return instanceName;
    }
​
    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }
​
    //......
}
  • ClientConfig的buildMQClientId会根据clientIP、instanceName、unitName来构建;clientIP默认值为RemotingUtil.getLocalAddress();instanceName默认值为System.getProperty("rocketmq.client.name", "DEFAULT");ClientConfig还提供了一个changeInstanceNameToPID方法,在instanceName值为默认值的时候,将其改为UtilAll.getPid();unitName默认为空

DefaultMQProducerImpl.start

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
​
    //......
​
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
​
                this.checkConfig();
​
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
​
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
​
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
​
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
​
                if (startFactory) {
                    mQClientFactory.start();
                }
​
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
​
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
​
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
​
    //......
}
  • DefaultMQProducerImpl的start方法在defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)为false的时候,会执行defaultMQProducer.changeInstanceNameToPID()方法

DefaultMQPushConsumerImpl.start

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
​
    //......
​
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
​
                this.checkConfig();
​
                this.copySubscription();
​
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
​
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
​
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
​
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
​
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
​
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
​
                this.consumeMessageService.start();
​
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
​
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
​
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }
​
    //......
}
  • DefaultMQPushConsumerImpl的start方法在defaultMQPushConsumer.getMessageModel()为MessageModel.CLUSTERING时,会执行defaultMQPushConsumer.changeInstanceNameToPID()方法

RocketMQUtil.getInstanceName

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/RocketMQUtil.java

public class RocketMQUtil {
​
    //......
​
    public static String getInstanceName(RPCHook rpcHook, String identify) {
        String separator = "|";
        StringBuilder instanceName = new StringBuilder();
        SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
        instanceName.append(sessionCredentials.getAccessKey())
            .append(separator).append(sessionCredentials.getSecretKey())
            .append(separator).append(identify)
            .append(separator).append(UtilAll.getPid());
        return instanceName.toString();
    }
​
    //......
}
  • RocketMQUtil提供了getInstanceName方法,它根据rpcHook的信息、identify以及UtilAll.getPid()来构建

initRocketMQPushConsumer

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
​
    private ApplicationContext applicationContext;
​
    //......
​
    private void initRocketMQPushConsumer() throws MQClientException {
        Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
        Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
        Assert.notNull(nameServer, "Property 'nameServer' is required");
        Assert.notNull(topic, "Property 'topic' is required");
​
        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
            this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
        boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
            consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                    this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
​
        String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
        if (customizedNameServer != null) {
            consumer.setNamesrvAddr(customizedNameServer);
        } else {
            consumer.setNamesrvAddr(nameServer);
        }
        if (accessChannel != null) {
            consumer.setAccessChannel(accessChannel);
        }
        consumer.setConsumeThreadMax(consumeThreadMax);
        if (consumeThreadMax < consumer.getConsumeThreadMin()) {
            consumer.setConsumeThreadMin(consumeThreadMax);
        }
        consumer.setConsumeTimeout(consumeTimeout);
​
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }
​
        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }
​
        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }
​
        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        }
​
    }
​
    //......
}
  • DefaultRocketMQListenerContainer的initRocketMQPushConsumer方法在rpcHook不为null的时候,会使用RocketMQUtil.getInstanceName(rpcHook, consumerGroup)来设置consumer的instanceName

createTransactionMQProducer

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
​
    private DefaultMQProducer producer;
​
    //......
​
    private TransactionMQProducer createTransactionMQProducer(String name,
        RocketMQLocalTransactionListener transactionListener,
        ExecutorService executorService, RPCHook rpcHook) {
        Assert.notNull(producer, "Property 'producer' is required");
        Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
        TransactionMQProducer txProducer;
        if (Objects.nonNull(rpcHook)) {
            txProducer = new TransactionMQProducer(name, rpcHook);
            txProducer.setVipChannelEnabled(false);
            txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
        } else {
            txProducer = new TransactionMQProducer(name);
        }
        txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
​
        txProducer.setNamespace(producer.getNamespace());
        txProducer.setNamesrvAddr(producer.getNamesrvAddr());
        if (executorService != null) {
            txProducer.setExecutorService(executorService);
        }
​
        txProducer.setSendMsgTimeout(producer.getSendMsgTimeout());
        txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed());
        txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed());
        txProducer.setMaxMessageSize(producer.getMaxMessageSize());
        txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch());
        txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK());
​
        return txProducer;
    }
​
    //......
}
  • RocketMQTemplate的createTransactionMQProducer方法在rpcHook不为null的时候会使用RocketMQUtil.getInstanceName(rpcHook, name)来设置txProducer的instanceName

小结

  • MQClientManager提供了getOrCreateMQClientInstance方法用于根据clientConfig及rpcHook来创建MQClientInstance;它使用factoryTable来存储clientId与MQClientInstance的映射关系,只要clientId是一样的,获取的就是相同的MQClientInstance;而clientId则由clientConfig.buildMQClientId()计算出来
  • ClientConfig的buildMQClientId会根据clientIP、instanceName、unitName来构建;clientIP默认值为RemotingUtil.getLocalAddress();instanceName默认值为System.getProperty("rocketmq.client.name", "DEFAULT");ClientConfig还提供了一个changeInstanceNameToPID方法,在instanceName值为默认值的时候,将其改为UtilAll.getPid();unitName默认为空
  • RocketMQUtil提供了getInstanceName方法,它根据rpcHook的信息、identify以及UtilAll.getPid()来构建;DefaultRocketMQListenerContainer的initRocketMQPushConsumer方法在rpcHook不为null的时候,会使用RocketMQUtil.getInstanceName(rpcHook, consumerGroup)来设置consumer的instanceName;RocketMQTemplate的createTransactionMQProducer方法在rpcHook不为null的时候会使用RocketMQUtil.getInstanceName(rpcHook, name)来设置txProducer的instanceName

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的getOrCreateMQClientInstance

    本文主要研究一下rocketmq的getOrCreateMQClientInstance

    codecraft
  • 聊聊rocketmq的PushConsumerImpl

    io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

    codecraft
  • 聊聊elasticsearch的LagDetector

    elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/...

    codecraft
  • 聊聊rocketmq的getOrCreateMQClientInstance

    本文主要研究一下rocketmq的getOrCreateMQClientInstance

    codecraft
  • 【React】249-当我开始使用React 时,我希望我知道这些知识

      可以给每个方法加上.bind(this)来解决 this 指向的问题,因为大多数教程都告诉你这样做。如果你有几个受控组件,那么constructor(){}...

    pingan8787
  • ActiveMQ源码分析——生产消息

    创建Session时,第一个传入是否开启事务,第二个传入session提交消费消息的方式 接下来看源码处理,生产者id对象由当前sessionID加上使用内部s...

    歪歪梯
  • iScroll学习小结

    最近项目需要实现一个fixed标题栏的功能,很普通的功能,实现核心也是在sroll事件中切换到fixed状态即可,但是在某些版本ios的某些内核中,在惯性滚动过...

    IMWeb前端团队
  • lottie系列文章(四):源码分析——svg渲染

    lottie为全局变量,主要有一个loadAnimation的方法,来加载和解析json,播放动画。

    IMWeb前端团队
  • HTML5 Canvas炫酷的火焰风暴动画

    越陌度阡
  • 【译】理解JavaScript中的This,Bind,Call和Apply

    this关键词在JavaScript中是个很重要的概念,也是一个对初学者和学习其他语言的人来说晦涩难懂。在JavaScript中,this是一个对象的引用。th...

    嘉明

扫码关注云+社区

领取腾讯云代金券