前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 生产者 Producer 启动过程

RocketMQ 生产者 Producer 启动过程

作者头像
java404
发布2018-12-24 14:22:06
2.7K0
发布2018-12-24 14:22:06
举报
文章被收录于专栏:java 成神之路java 成神之路

MQProducer

从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。

  • DefaultMQProducer: 我们常用的生产者。
  • TransactionMQProducer:继承自 DefaultMQProducer,并支持事务消息。

下面我们来分析下 DefaultMQProducer 启动的过程。

启动示例
代码语言:javascript
复制
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("....");
            ......
            producer.start();
            ......
        }catch(Exception e){}
    }
}

创建 DefaultMQProducer 实例,然后制定一些参数,调用 start() 方法就开启了生产者。

DefaultMQProducer 参数分析
代码语言:javascript
复制
public class DefaultMQProducer extends ClientConfig implements MQProducer {

    //producer 组名
    private String producerGroup;

    // Topic 名字,默认为“TBW102”
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

    // 创建 Topic 默认的4个队列
    private volatile int defaultTopicQueueNums = 4;

    // 发送消息超时时间
    private int sendMsgTimeout = 3000;

    // 当发送的消息大于 4K 时,开始压缩消息。
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    //同步发送消息,发送失败时再尝试发送2次数
    private int retryTimesWhenSendFailed = 2;

    // 异步发送消息,发送失败时再尝试发送2次数
    private int retryTimesWhenSendAsyncFailed = 2;

    //发送broker消息存储失败时,是否尝试去试发送其他的broker
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    //最大允许发送字节数
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

DefaultMQProducer 中定义的类属性

  • producerGroup: 生产者组名
  • createTopicKey :Topic 名字,默认为“TBW102”
  • defaultTopicQueueNums :创建 Topic 默认的4个队列
  • sendMsgTimeout :默认发送消息3秒超时
  • compressMsgBodyOverHowmuch :当发送的消息大于 4K 时,开始压缩消息。
  • retryTimesWhenSendFailed :同步发送消息,发送失败时再尝试发送2次数。
  • retryTimesWhenSendAsyncFailed :异步发送消息,发送失败时再尝试发送2次数
  • retryAnotherBrokerWhenNotStoreOK :发送broker消息存储失败时,是否尝试去试发送其他的broker

DefaultMQProducer 还有可以设置其他的参数,这里就不说明了。

Producer 启动
代码语言:javascript
复制
public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

public void start() throws MQClientException {
    this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
        case CREATE_JUST:
            //2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
            this.serviceState = ServiceState.START_FAILED;
            // 3. 检查 groupName 是否合法
            this.checkConfig();

            //4. 判断是否需要设置 InstanceName 。
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 5. 构建 MQClientInstance 对象。
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 6.
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            // 7.
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            // 8.
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 5.
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
  1. 启动Producer的时候判断 serviceState 的当前状态,只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer。否则抛出异常信息。

2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。

3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255); groupName 也不能等于 DEFAULT_PRODUCER。只要满足上面条件,则抛异常信息。

4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。

代码语言:javascript
复制
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}
public static int getPid() {
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    String name = runtime.getName(); // format: "pid@hostname"
    try {
        return Integer.parseInt(name.substring(0, name.indexOf('@')));
    .....
}

5、构建 MQClientInstance 对象。

代码语言:javascript
复制
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
  • 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
  • 如果 factoryTable 中是不已经存在 MQClientInstance 实例,则创建。
代码语言:javascript
复制
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }

    this.clientId = clientId;

    this.mQAdminImpl = new MQAdminImpl(this);

    this.pullMessageService = new PullMessageService(this);

    this.rebalanceService = new RebalanceService(this);

    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);

    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

未完待续。。。。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MQProducer
    • 启动示例
      • DefaultMQProducer 参数分析
        • Producer 启动
        • 未完待续。。。。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档