前言 本文笔者是一波三折啊,很多人像我一样第一次在springboot里搞rocketmq的,遇到各种麻烦,我也是一样,就比如:
踩坑:
nameserver启动成功了,broker启动失败,然后broker日志文件没有生成,错误原因也找不到。 在java里发送消息时发送失败,幸好有报错信息[Send [3] times, still failed],就是重发三次也没发出去的意思。网上资料说是broker启动方式不对,然而并没有解决 ……
本文是简单的使用发送消息和消费消息,让大家了解一下流程。
正文 在此之前我们要启动一个nameserver和一个broker
上文我已经介绍了,链接如下:启动消息服务
第一步
创建一个maven项目,pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rocketmq_demo</groupId>
<artifactId>rocketmq_demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
</project>
第二步
创建启动类,RocketMQApp,这个启动类当作生产者
package com;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yanlin
* @version v1.3
* @date 2019-01-24 3:43 PM
* @since v8.0
**/
@SpringBootApplication
public class RocketMQApp {
public static void main(String[] args) throws MQClientException, InterruptedException {
SpringApplication.run(RocketMQApp.class, args);
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
第三步
创建消费者 RocketMQConsumer,代码如下
package com.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author yanlin
* @version v1.3
* @date 2019-01-24 3:24 PM
* @since v8.0
**/
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
最后一步
右键启动启动类-RocketMQApp,观察控制台:
主要发送消息的日志如下
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94180000, offsetMsgId=0A08406F00002A9F0000000000058B56, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94270001, offsetMsgId=0A08406F00002A9F0000000000058C08, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942A0002, offsetMsgId=0A08406F00002A9F0000000000058CBA, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=505]
………………
右键启动消费者-RocketMQConsumer,观察控制台如下
Receive New Messages 后面的就是消息主体
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807386, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807396, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058B56, commitLogOffset=363350, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94180000, WAIT=true, TAGS=TagA}, body=16]]]
…………
我们发送了20条,然后成功消费20条,这就是rocketmq最简单的应用案例了。以后的篇幅我就会讲解更细致的内容,让大家能适用任何场景下,让大家了解各个对象的原理。