下面分别演示两种消息模式。
使用DefaultMQProducer,发送8条消息:
package william.rmq.producer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import william.rmq.producer.common.CommonSendCallback;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 10:58
* @Description:RocketMQ消息生产者
*/
@Service
@Slf4j
public class MessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
@PostConstruct
public void start(){
try {
producer.setNamesrvAddr(namesrvAddr);
producer.setRetryTimesWhenSendFailed(RocketMQConstant.MAX_RETRY_TIMES);
producer.start();
log.info("Message Producer Start...");
System.err.println("Message Producer Start...");
}catch (Exception e){
log.error("Message Producer Start Error!!",e);
}
String message = "Message-";
String topic = RocketMQConstant.TEST_TOPIC_NAME;
String tags = "Tags";
String keys = "Keys-";
for (int i = 1;i <= 8;i++){
sendMessage(message + i,topic,tags,keys + i);
}
}
public void sendMessage(String data, String topic, String tags, String keys) {
try {
byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(topic, tags, keys, messageBody);
producer.send(mqMsg, new CommonSendCallback());
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费超时时间(分钟)
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
//订阅主题
consumer.subscribe(RocketMQConstant.TEST_TOPIC_NAME, "*");
//注册消息监听器
consumer.registerMessageListener(this);
//设置批量消费最大消息数,这里设置为逐条消费
consumer.setConsumeMessageBatchMaxSize(1);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
先启动2个Consumer程序,再启动单个Producer程序,两个Consumer端控制台输出如下:
Consumer1:
Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30006,topic: DefaultCluster,tags: Tags,messageBody: Message-7
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30007,topic: DefaultCluster,tags: Tags,messageBody: Message-8
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70002,topic: DefaultCluster,tags: Tags,messageBody: Message-1
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30005,topic: DefaultCluster,tags: Tags,messageBody: Message-6
Consumer2:
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D80003,topic: DefaultCluster,tags: Tags,messageBody: Message-4
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70000,topic: DefaultCluster,tags: Tags,messageBody: Message-2
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70001,topic: DefaultCluster,tags: Tags,messageBody: Message-3
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30004,topic: DefaultCluster,tags: Tags,messageBody: Message-5
可以看到,使用集群消息模式后,两个Consumer负载均衡消费了8条消息。