Spring Cloud Stream 使用

最近更新时间:2023-09-08 11:00:12

我的收藏

操作场景

本文以调用 Spring Cloud Stream 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

下载 Demo 或者前往 GitHub 项目

操作步骤

步骤1:引入依赖

在 pom.xml 中引入 spring-cloud-starter-stream-rocketmq 相关依赖。当前建议版本 2021.0.5.0,同时需要排除依赖,使用4.9.7的 SDK。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.0.5.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.7</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.7</version>
</dependency>

步骤2:添加配置

在配置文件中增加 RocketMQ 相关配置。
spring:
cloud:
stream:
rocketmq:
binder:
# 服务地址全称
name-server: rmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080
# 角色名称
secret-key: admin
# 角色密钥
access-key: eyJrZXlJZ...
# producer group
group: producerGroup
bindings:
# channel名称, 与spring.cloud.stream.bindings下的channel名称对应
Topic-TAG1-Input:
consumer:
# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
subscription: TAG1
# channel名称
Topic-TAG2-Input:
consumer:
subscription: TAG2
bindings:
# channel名称
Topic-send-Output:
# 指定topic, 对应创建的topic名称
destination: TopicTest
content-type: application/json
# channel名称
Topic-TAG1-Input:
destination: TopicTest
content-type: application/json
group: consumer-group1
# channel名称
Topic-TAG2-Input:
destination: TopicTest
content-type: application/json
group: consumer-group2
注意
配置方面 2.2.5-RocketMQ-RC12.2.5.RocketMQ.RC2 的订阅配置项为 subscription , 其他低版本订阅配置项为 tags
其他版本完整配置项参考如下:
spring:
cloud:
stream:
rocketmq:
bindings:
# channel名称, 与spring.cloud.stream.bindings下的channel名称对应
Topic-test1:
consumer:
# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
tags: TAG1
# channel名称
Topic-test2:
consumer:
tags: TAG2
binder:
# 服务地址全称
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080
# 角色名称
secret-key: admin
# 角色密钥
access-key: eyJrZXlJZ...
bindings:
# channel名称
Topic-send:
# 指定topic,
destination: topic1
content-type: application/json
# 要使用group全称
group: group1
# channel名称
Topic-test1:
destination: topic1
content-type: application/json
group: group1
# channel名称
Topic-test2:
destination: topic1
content-type: application/json
group: group2
参数
说明
name-server
集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
secret-key
角色名称,在 集群管理 页面复制 accessSecret 复制。
access-key
角色密钥,在 集群管理 页面复制 accessKey 复制。



group
生产者 Group 的名称,在控制台 Group 页面复制。
destination
Topic 的名称,在控制台 topic 页面复制。

步骤3:配置 channel

channel 分为输入和输出,可根据自己的业务进行单独配置。
/**
* 自定义通道 Binder
*/
public interface CustomChannelBinder {

/**
* 发送消息(消息生产者)
* 绑定配置中的channel名称
*/
@Output("Topic-send-Output")
MessageChannel sendChannel();


/**
* 接收消息1(消费者1)
* 绑定配置中的channel名称
*/
@Input("Topic-TAG1-Input")
MessageChannel testInputChannel1();

/**
* 接收消息2(消费者2)
* 绑定配置中的channel名称
*/
@Input("Topic-TAG2-Input")
MessageChannel testInputChannel2();
}


步骤4:添加注解

在配置类或启动类上添加相应注解,如果有多个 binder 配置,都要在此注解中进行指定。
@EnableBinding({CustomChannelBinder.class})

步骤5:发送消息

1. 在要发送消息的类中,注入 CustomChannelBinder
@Autowired
private CustomChannelBinder channelBinder;
2. 发送消息,调用对应的输出流 channel 进行消息发送。
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);

步骤6:消费消息

@Service
public class StreamConsumer {
private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);

/**
* 监听channel (配置中的channel 名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-TAG1-Input")
public void receive(String messageBody) {
logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);
}

/**
* 监听channel (配置中的channel 名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-TAG2-Input")
public void receive2(String messageBody) {
logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);
}
}

步骤7:本地测试

本地启动项目之后,可以从控制台看到启动成功。
浏览器访问 http://localhost:8080/test-simple可以看到发送成功。观察开发 IDE 的输出日志。
2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: 通过stream发送消息,messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: 通过stream收到消息,messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}

可以看到。发送了一条 TAG1 的消息,同时也只有 TAG1 的订阅者收到了消息。
说明
具体使用可参见 GitHub DemoSpring cloud stream 官网