Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现
比如我们用到了RabbitMQ或者Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,想往另外一种消息队列进行迁移,这时候无疑是就是灾难,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
它屏蔽了各种MQ的差异,统一了编程模型,业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可
1、应用模型
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,通过我们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
3、发布/订阅
简单的讲就是一种生产者,消费者模式。发布者是生产,将输出发布到数据中心,订阅者是消费者,订阅自己感兴趣的数据。当有数据到达数据中心时,就把数据发送给对应的订阅者
4、消费组
直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,这就很可能会出现重复消费的问题,在某些场景下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。
通常情况下,当有一个应用绑定到目的地的时候,最好指定消费消费组。扩展Spring Cloud Stream应用程序时,必须为每个输入绑定指定一个使用者组。这样做可以防止应用程序的实例接收重复的消息,而且所有拥有订阅主题的消费组都是持久化的,除了匿名消费组(即不设置group)
5、分区
有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了
示例主要演示了当数据库配置信息变更,通过springcloud-stream进行变更通知推送,并动态切换数据源,如果配置数据库url发生变更,同时记录变更日志到数据库,示例开始之前,让大家再次熟悉spring cloud stream应用模型图
本示例的Middleware为rabbitmq。
1、通过docker创建并启动一个rabbitmq
docker pull rabbitmq:3-management
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
2、pom引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
3、自定义消息通道
public interface LogStreamBinder {
String DB_CONFIG_TOPIC = "dbConfigTopic";
String OPERATE_LOG_TOPIC = "operateLogTopic";
@Input(DB_CONFIG_TOPIC)
SubscribableChannel dbConfig();
@Output(OPERATE_LOG_TOPIC)
MessageChannel operateLog();
}
@Input注解的参数则表示了输入消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。@Output注解中描述了输出消息通道的名称,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法
4、在启动类上加上@EnableBinding,进行消息通道绑定
@SpringBootApplication
@EnableBinding(LogStreamBinder.class)
public class LogApplication{}
5、监听自定义的消息通道
@StreamListener(value= LogStreamBinder.DB_CONFIG_TOPIC)
public void changeDbConfig(String dbConfigDTOJson){
}
6、发送消息
@Autowired
private DbConfigStreamBinder dbConfigBinder;
@Override
public String changeDbConfig(DbConfigInfoDTO dbConfigInfoDTO) {
String json = JSON.toJSONString(dbConfigInfoDTO);
boolean sendOk = dbConfigBinder.dbConfig().send(MessageBuilder.withPayload(json).build());
if(sendOk){
return "success";
}
return "fail";
}
1、给消费者设置消费组和主题
设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>
给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>
2、消费者开启分区,指定实例数量与实例索引
开启消费分区: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
消费实例数量: spring.cloud.stream.instanceCount=1 (具体指定)
实例索引: spring.cloud.stream.instanceIndex=1 #设置当前实例的索引值
3、生产者指定分区键
分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
大家大体都知道消息队列具体削峰填谷、异步、解耦等作用,当我们项目中可能涉及到引入多种消息队列时,则我们就可以考虑一下引用spring cloud stream来统一编程模型,让我们不再关注具体消息中间件,更专注于业务开发
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-stream-saveLog https://github.com/lyb-geek/springboot-learning/tree/master/springboot-stream-changeDbInfo
Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收
https://www.cnblogs.com/hellxz/p/9396282.html#_label2
SpringCloud实战9-Stream消息驱动
https://www.cnblogs.com/huangjuncong/p/9102843.html
SpringCloud 之 Stream
https://www.jianshu.com/p/404fc32122d1