Spring Boot Starter 使用

最近更新时间:2023-10-07 14:40:42

我的收藏

操作场景

本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

下载 Demo或者前往GitHub 项目

操作步骤

步骤1:添加依赖

在 pom.xml 中添加依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.7</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.7</version>
</dependency>

步骤2:准备配置

在配置文件中添加配置信息。
server:
port: 8082

#rocketmq配置信息
rocketmq:
# tdmq-rocketmq服务接入地址
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080
# 生产者配置
producer:
# 生产者组名
group: group111
# 角色密钥
access-key: eyJrZXlJZC....
# 已授权的角色名称
secret-key: admin
# 消费者公共配置
consumer:
# 角色密钥
access-key: eyJrZXlJZC....
# 已授权的角色名称
secret-key: admin

# 用户自定义配置

producer1:
topic: testdev1
consumer1:
group: group111
topic: testdev1
subExpression: TAG1
consumer2:
group: group222
topic: testdev1
subExpression: TAG2
参数
说明
name-server
集群接入地址,在集群基本信息中,根据使用需求,使用不同的内网/公网接入地址



group
生产者 Group 的名称,在控制台 Group 页面复制。
secret-key
角色名称,在 集群管理 页面复制 accessSecret 复制。
access-key
角色密钥,在 集群管理 页面复制 accessKey 复制。



topic
Topic 的名称,在控制台 topic 页面复制。
subExpression
用来设置消息的 TAG。

步骤3:发送消息

1. 在需要发送消息的类中注入 RcoketMQTemplate
@Value("${rocketmq.producer1.topic}")
private String topic; // topic名称
@Autowired
private RocketMQTemplate rocketMQTemplate;
2. 发送消息,消息体可以是自定义对象,也可以是 Message 对象(org.springframework.messaging包中)。
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
/*------------------------------------------------------------------------*/
rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build())
3. 完整示例如下。
/**
* Description: 消息生产者
*/
@Service
public class SendMessage {
// 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:topic名称
@Value("${rocketmq.producer1.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送
*
* @param message 消息内容
* @param tags 订阅tags
*/
public void syncSend(String message, String tags) {
// springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: `topicName:tags`,不拼接标识无tag
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult = rocketMQTemplate.syncSend(destination,
MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_KEYS, "yourKey") // 指定业务key
.build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
}
}
说明
该示例为同步发送。异步发送,单向发送等请参见 Demo 或者前往 GitHub 项目

步骤4:消费消息

@Service
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", // 消费组,格式:group名称
// 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:topic名称
topic = "${rocketmq.consumer1.topic}",
selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
)
public class MessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println("Tag1Consumer receive message:" + message);
}
}
可根据业务需求配置多个消费者。消费者其他配置可根据具体业务需求进行配置。
说明
完整示例参见下载 Demo 或者前往 GitHub 项目

步骤5:查看消费详情

发送完成消息后会得到一个消息 ID (messageID),开发者可以在 “消息查询” 页面查询刚刚发送的消息,如下图所示;并且可以查看特定消息的详情和轨迹等信息,详情见 消息查询 章节。