从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。
下面我们来分析下 DefaultMQProducer 启动的过程。
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("....");
......
producer.start();
......
}catch(Exception e){}
}
}
创建 DefaultMQProducer 实例,然后制定一些参数,调用 start() 方法就开启了生产者。
public class DefaultMQProducer extends ClientConfig implements MQProducer {
//producer 组名
private String producerGroup;
// Topic 名字,默认为“TBW102”
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
// 创建 Topic 默认的4个队列
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间
private int sendMsgTimeout = 3000;
// 当发送的消息大于 4K 时,开始压缩消息。
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步发送消息,发送失败时再尝试发送2次数
private int retryTimesWhenSendFailed = 2;
// 异步发送消息,发送失败时再尝试发送2次数
private int retryTimesWhenSendAsyncFailed = 2;
//发送broker消息存储失败时,是否尝试去试发送其他的broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//最大允许发送字节数
private int maxMessageSize = 1024 * 1024 * 4; // 4M
DefaultMQProducer 中定义的类属性
DefaultMQProducer 还有可以设置其他的参数,这里就不说明了。
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
case CREATE_JUST:
//2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
this.serviceState = ServiceState.START_FAILED;
// 3. 检查 groupName 是否合法
this.checkConfig();
//4. 判断是否需要设置 InstanceName 。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 5. 构建 MQClientInstance 对象。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 6.
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 7.
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 8.
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 5.
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$
,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255
);
groupName 也不能等于 DEFAULT_PRODUCER
。只要满足上面条件,则抛异常信息。
4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"
,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
.....
}
5、构建 MQClientInstance 对象。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}