前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Stream 消息驱动

Stream 消息驱动

作者头像
BUG弄潮儿
发布于 2021-09-10 07:35:57
发布于 2021-09-10 07:35:57
3620
举报
文章被收录于专栏:JAVA乐园JAVA乐园

一、什么是Spring Cloud Stream?

  • 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
  • 应用程序通过inputs或者 outputs 来与Spring Cloud Stream中binder对象交互。
  • 通过我们配置来binding(绑定),而Spring Cloud Stream 的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
  • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
  • Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
  • 目前仅支持RabbitMQ、 Kafka

二、Stream的设计思想

1、标准MQ

  • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息必须走特定的通道 - 消息通道 Message Channel
  • 消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅。

2、为什么用Cloud Stream?

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了—种解耦合的方式。

3、Stream凭什么可以统一底层差异?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

4、通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

Binder

  • INPUT对应于消费者
  • OUTPUT对应于生产者

Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

三、Stream编码常用注解简介

1. Spring Cloud Stream标准流程套路
  • Binder - 很方便的连接中间件,屏蔽差异。
  • Channel - 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
  • Source和Sink - 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
2. 编码API和常用注解

四、Stream之消息重复消费

运行后有两个问题

  1. 有重复消费问题
  2. 消息持久化问题

消费

  • http://localhost:8801/sendMessage
  • 目前是8802/8803同时都收到了,存在重复消费问题
  • 如何解决:分组和持久化属性group(重要)
生产实际案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

五、Stream之group解决消息重复消费

1. 原理

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

8802/8803都变成不同组,group两个不同

group: A_Group、B_Group

六、Stream之消息持久化

  • 通过上述,解决了重复消费问题,再看看持久化。
  • 停止8802/8803并去除掉8802的分组group: A_Group,8803的分组group: A_Group没有去掉。
  • 8801先发送4条消息到RabbitMq。
  • 先启动8802,无分组属性配置,后台没有打出来消息。
  • 再启动8803,有分组属性配置,后台打出来了MQ上的消息。(消息持久化体现)

source:https://www.yuque.com/yanzipang-wf7ur/hkyrfw/vbkxz8

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 BUG弄潮儿 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Stream 消息驱动
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)
用户9615083
2022/12/25
3810
Stream 消息驱动
SpringCloud Stream消息驱动
  官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
别团等shy哥发育
2023/02/25
3650
SpringCloud Stream消息驱动
SpringCloud Stream消息驱动
cheese
2023/10/25
2950
SpringCloud Stream消息驱动
SpringCloud集成Stream
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)
大忽悠爱学习
2021/12/08
4560
SpringCloud集成Stream
springcloud : Stream消息驱动
官网 : https://spring.io/projects/spring-cloud-stream#overview
冷环渊
2021/10/19
6480
消息驱动(SpringCloud Stream)
官网:https://spring.io/projects/spring-cloud-stream#overview https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/ Spring Cloud Stream中文指导手册: https://m.wang1314.com/doc/webapp/topic/20971999.html
鱼找水需要时间
2023/02/16
3990
消息驱动(SpringCloud Stream)
15-SpringCloud Stream
应用程序通过inputs或者 outputs 来与Spring Cloud Stream中binder对象交互。
彼岸舞
2021/09/09
5120
15-SpringCloud Stream
微服务(十二)——Steam消息驱动&Sleuth链路监控
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)
不愿意做鱼的小鲸鱼
2022/09/26
3980
微服务(十二)——Steam消息驱动&Sleuth链路监控
SpringCloud之Stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。
shaoshaossm
2022/12/27
3340
SpringCloud之Stream
SpringCloud Stream 消息驱动
注意:有个大坑,视频里的 application.yml 使用了 spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.xx
OY
2022/03/17
2940
SpringCloud Stream 消息驱动
SpringCloud Stream消息驱动
目前市面上常用的四种消息中间件:ActiveMQ、RabbitMQ、RocketMQ、Kafka。由于每个项目需求的不同,在消息中间件的选型上也就会不同。
乐心湖
2021/01/18
8510
SpringCloud Stream消息驱动
springcloud之Stream
创建项目cloud-stream-rabbitmq-consumer8803与cloud-stream-rabbitmq-consumer8802是并列的消费者。 此时生产者发送消息,这两个消费者都会收到,因为默认情况下不同的微服务是不同组,是并列关系,只要改为竞争关系即可。
冬天vs不冷
2025/01/20
610
springcloud之Stream
SpringCloud Stream消息驱动代码实战
cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
一个风轻云淡
2022/11/13
2140
SpringCloud Stream消息驱动代码实战
springCloud --- 中级篇(3)
本系列笔记涉及到的代码在GitHub上,地址:https://github.com/zsllsz/cloud
贪挽懒月
2020/06/08
7970
springCloud --- 中级篇(3)
10SpringCloud Stream消息驱动
原因:rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问
Remember_Ray
2020/11/04
3320
10SpringCloud Stream消息驱动
SpringCloud Stream消息驱动
什么是SpringCloudStream,官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。 通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。  通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。 Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。 目前仅支持RabbitMQ、Kafka。
一个风轻云淡
2022/11/13
3300
SpringCloud Stream消息驱动
springboot实战之stream流式消息驱动
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现
lyb-geek
2019/08/21
4.8K0
Spring Cloud 系列之消息驱动 Stream
  在一个系统中我们可能包含前端页面、接口服务、大数据层,可能在接口服务中使用的是 RabbitMQ 而在大数据层中使用的是 Kafka,那么我只会 RabbitMQ 不会 Kafka 岂不是还要去学习,白天 996 晚上 007 简直要命。那么有没有一个像 JDBC 一样的能够屏蔽细节让我们可以迅速切换。   Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。它基于 Spring Boot 构建独立的、生产级的 Spring 应用,并使用 Spring Integration 为消息代理提供链接。应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前只实现了 Kafka 和 RabbitMQ 的 Binder。
Demo_Null
2020/11/24
1.4K0
Spring Cloud 系列之消息驱动 Stream
Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
通过《Spring Cloud构建微服务架构:消息驱动的微服务(入门)》一文,相信大家对Spring Cloud Stream的工作模式已经有了一些基础概念,比如:输入、输出通道的绑定,通道消息事件的监听等。下面在本文中,我们将详细介绍一下Spring Cloud Stream中是如何通过定义一些基础概念来对各种不同的消息中间件做抽象的。 下图是官方文档中对于Spring Cloud Stream应用模型的结构图。从中我们可以看到,Spring Cloud Stream构建的应用程序与消息中间件之间是通
程序猿DD
2018/03/21
1.2K0
Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
使用Spring Cloud Stream 构建消息驱动微服务
Spring Cloud Stream is a framework for building message-driven microservice applications.
烂猪皮
2019/12/05
1.5K0
相关推荐
Stream 消息驱动
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档