前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot实战之stream流式消息驱动

springboot实战之stream流式消息驱动

作者头像
lyb-geek
发布2019-08-21 17:06:18
4.5K0
发布2019-08-21 17:06:18
举报
文章被收录于专栏:Linyb极客之路

什么是Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现

为什么需要Spring Cloud Stream消息驱动?

比如我们用到了RabbitMQ或者Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,想往另外一种消息队列进行迁移,这时候无疑是就是灾难,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

它屏蔽了各种MQ的差异,统一了编程模型,业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可

Spring Cloud Stream相关概念简介

1、应用模型

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,通过我们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

  • Inputs 接收消息的通道
  • Output 发送消息的通道
  • Binder 可理解为一个抽象的中间件,应用通过在spring cloud stream中所注入的inputs,outputs通道来跟外界消息通信,而这些通道又是通过具体中间件的Binder实现来连接到消息队列的服务器上。有了Binder,甚至可以不改一行代码,就切换中间件的类型
  • Middleware 具体的消息中间件

3、发布/订阅

简单的讲就是一种生产者,消费者模式。发布者是生产,将输出发布到数据中心,订阅者是消费者,订阅自己感兴趣的数据。当有数据到达数据中心时,就把数据发送给对应的订阅者

4、消费组

直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。

默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,这就很可能会出现重复消费的问题,在某些场景下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。

通常情况下,当有一个应用绑定到目的地的时候,最好指定消费消费组。扩展Spring Cloud Stream应用程序时,必须为每个输入绑定指定一个使用者组。这样做可以防止应用程序的实例接收重复的消息,而且所有拥有订阅主题的消费组都是持久化的,除了匿名消费组(即不设置group)

5、分区

有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了

Spring Cloud Stream 示例

示例主要演示了当数据库配置信息变更,通过springcloud-stream进行变更通知推送,并动态切换数据源,如果配置数据库url发生变更,同时记录变更日志到数据库,示例开始之前,让大家再次熟悉spring cloud stream应用模型图

本示例的Middleware为rabbitmq。

1、通过docker创建并启动一个rabbitmq

代码语言:javascript
复制
docker pull rabbitmq:3-management

docker run -d  --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

2、pom引入依赖

代码语言:javascript
复制
<dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      </dependency>

3、自定义消息通道

代码语言:javascript
复制
public interface LogStreamBinder {

  String DB_CONFIG_TOPIC = "dbConfigTopic";

  String OPERATE_LOG_TOPIC = "operateLogTopic";


  @Input(DB_CONFIG_TOPIC)
  SubscribableChannel dbConfig();

  @Output(OPERATE_LOG_TOPIC)
  MessageChannel operateLog();

}

@Input注解的参数则表示了输入消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。@Output注解中描述了输出消息通道的名称,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法

4、在启动类上加上@EnableBinding,进行消息通道绑定

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(LogStreamBinder.class)
public class LogApplication{}

5、监听自定义的消息通道

代码语言:javascript
复制
@StreamListener(value= LogStreamBinder.DB_CONFIG_TOPIC)
  public void changeDbConfig(String dbConfigDTOJson){
  }

6、发送消息

代码语言:javascript
复制
@Autowired
  private DbConfigStreamBinder dbConfigBinder;

  @Override
  public String changeDbConfig(DbConfigInfoDTO dbConfigInfoDTO) {
    String json = JSON.toJSONString(dbConfigInfoDTO);
    boolean sendOk = dbConfigBinder.dbConfig().send(MessageBuilder.withPayload(json).build());
    if(sendOk){
      return "success";
    }

    return "fail";
  }

常用配置

1、给消费者设置消费组和主题

代码语言:javascript
复制
设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>
给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>

2、消费者开启分区,指定实例数量与实例索引

代码语言:javascript
复制
开启消费分区: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
消费实例数量: spring.cloud.stream.instanceCount=1 (具体指定)
实例索引: spring.cloud.stream.instanceIndex=1 #设置当前实例的索引值

3、生产者指定分区键

代码语言:javascript
复制
分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>

总结

大家大体都知道消息队列具体削峰填谷、异步、解耦等作用,当我们项目中可能涉及到引入多种消息队列时,则我们就可以考虑一下引用spring cloud stream来统一编程模型,让我们不再关注具体消息中间件,更专注于业务开发

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-stream-saveLog https://github.com/lyb-geek/springboot-learning/tree/master/springboot-stream-changeDbInfo

参考文档

Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收

https://www.cnblogs.com/hellxz/p/9396282.html#_label2

SpringCloud实战9-Stream消息驱动

https://www.cnblogs.com/huangjuncong/p/9102843.html

SpringCloud 之 Stream

https://www.jianshu.com/p/404fc32122d1

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Linyb极客之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是Spring Cloud Stream
  • 为什么需要Spring Cloud Stream消息驱动?
  • Spring Cloud Stream相关概念简介
  • Spring Cloud Stream 示例
  • 常用配置
  • 总结
  • demo链接
  • 参考文档
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档