本节演示使用SpringBoot整合RocketMQ的入门程序,包括消息的生产端和消费端两个工程。
开发环境:JDK1.8、IntelliJ IDEA、Maven3.5.3、SpringBoot2.0.3、RocketMQ 4.3.0
首先,引入RocketMQ的客户端依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<rocketmq.version>4.3.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RocketMQ依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
配置RocketMQ nameserver的地址:
spring:
rocketmq:
namesrvAddr: localhost:9876
package william.rmq.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:44
* @Description:RocketMQ生产端启动类
*/
@SpringBootApplication
public class RocketMQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProducerApplication.class);
}
}
package william.rmq.producer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 10:58
* @Description:RocketMQ消息生产者
*/
@Service
@Slf4j
public class MessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
@PostConstruct
public void start(){
try {
producer.setNamesrvAddr(namesrvAddr);
producer.start();
log.info("Message Producer Start...");
System.err.println("Message Producer Start...");
}catch (Exception e){
log.error("Message Producer Start Error!!",e);
}
}
public void sendMessage(String data, String topic, String tags, String keys) {
try {
byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(topic, tags, keys, messageBody);
producer.send(mqMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Message Producer: Send Message Success {}", sendResult);
System.err.println("Message Producer: Send Message Success {}" + sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("Message Producer: Send Message Error ", throwable);
System.err.println("Message Producer: Send Message Error " + throwable);
}
});
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
package william.rmq.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:47
* @Description:RocketMQ消费端启动类
*/
@SpringBootApplication
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class);
}
}
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
//指定nameserver地址
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}"
, msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
} catch (Exception e) {
log.error("Consume Message Error!!", e);
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package william.rmq.producer.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import william.rmq.producer.RocketMQProducerApplication;
import william.rmq.producer.quickstart.MessageProducer;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 12:39
* @Description:
*/
@SpringBootTest(classes = RocketMQProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestSendMessage {
@Autowired
private MessageProducer producer;
@Test
public void testSendMessage(){
String message = "Message-";
String topic = "DefaultCluster";
String tags = "Tags";
String keys = "Keys-";
for (int i = 1;i <= 5;i++){
producer.sendMessage(message + i,topic,tags,keys + i);
}
}
}
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70001, offsetMsgId=C0A81FFA00002A9F0000000000001064, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=11], queueOffset=1]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECED0003, offsetMsgId=C0A81FFA00002A9F0000000000000EEC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=9], queueOffset=0]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70000, offsetMsgId=C0A81FFA00002A9F0000000000000FA8, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=4], queueOffset=3]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE80002, offsetMsgId=C0A81FFA00002A9F0000000000001120, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=1], queueOffset=2]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECF50004, offsetMsgId=C0A81FFA00002A9F00000000000011DC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=5], queueOffset=3]
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70001,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECF50004,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE80002,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70000,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECED0003,topic: DefaultCluster,tags: Tags