DefaultMQProducer是一个默认的消息生产者,可以支持发送普通消息和顺序消息。
DefaultMQProducer中定义了一些发送消息相关的属性,还提供了发送消息的相关方法,可以支持同步发送和异步发送,可以发往Broker,由Broker决定具体发往的Queue,也可以指定发往的Queue。
下面简单结合源码,对其API进行介绍:
//生产者组
private String producerGroup;
//创建Topic时的topicKey,在测试时可指定Broker自增模式
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
//每个Topic中默认有4个Queue来存储消息
private volatile int defaultTopicQueueNums = 4;
//默认发送超时3000ms
private int sendMsgTimeout = 3000;
//默认情况下,当消息体字节数超过4k时,消息会被压缩(Consumer收到消息会自动解压缩)
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步发送消息时,消息发送失败后的最大重试次数。RocketMQ在消息重试机制上有很好的支持,但是重试可能会引起重复消息的问题,这需要在逻辑上进行幂等处理。
private int retryTimesWhenSendFailed = 2;
//异步发送时的最大重试次数,类似retryTimesWhenSendFailed
private int retryTimesWhenSendAsyncFailed = 2;
//如果消息发送成功,但是返回SendResult != SendStatus.SEND_OK,是否尝试发往别的Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//默认最大消息长度:4M,当消息长度超过限制时,RocketMQ会自动抛出异常
private int maxMessageSize = 1024 * 1024 * 4;
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
public DefaultMQProducer(RPCHook rpcHook) {
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}
//启动方法,发送消息前必须调用
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
//关闭方法,释放资源
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
}
//同步发送消息,该方法会阻塞直到消息发送成功
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
//同步发送消息,指定超时时间
@Override
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
//异步发送消息,并注册发送回调。该方法会立即返回,当发送完成后,Broker会回调SendCallback通知发送结果
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
//异步发送消息,并指定超时时间
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
//同步发送消息,发往指定队列
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
//异步发送消息到指定队列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
//创建一个Topic。RocketMQ可支持Topic的自动创建,也可手动调用createTopic方法创建。创建Topic时需指定Topic的key,测试时可使用Broker自增key的方式,但是实际生产情况下应使用具有业务意义的key。创建时还可以指定Topic的name、Topic中Queue的数量等。
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
}