前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Spring Cloud]Stream组件介绍

[Spring Cloud]Stream组件介绍

原创
作者头像
宇宙无敌暴龙战士之心悦大王
修改2023-03-14 17:54:19
4.5K0
修改2023-03-14 17:54:19
举报
文章被收录于专栏:kwai

SCS 在 3.x 做了很大的改动,废除了诸如 @StreamListener、@Input、@Output 等类,保留了 Binder、Binding,并提供了批量消费的支持。 本着学新不学旧的原则,本文将介绍 SCS 3.x 相关内容。 由于关于 spring cloud stream kafka 的文档比较充足,本文就此为例介绍 SCS。

Binder 是提供与外部消息中间件集成的组件,为 Binding 提供了 2 个方法,分别是 bindConsumer 和 bindProducer,它们用于构造生产者和消费者。 Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。

Binder 事务

不要在事务中尝试重试和提交死信。重试时,事务可能已经回归。如果想要提交死信用于善后,那么可以使用 DefaultAfterRollbackProcessor 以在回滚之后提交死信。

Error Channel

binder 会使用 Error Channel 向消费者传递异常,同时可以配置异步生产者发生异常时将异常传递到 Error Channel。

Dead-Letter

默认情况下,某 topic 的死信队列将与原始记录存在于相同分区中。

死信队列中的消息是允许复活的,但是应该避免消息反复消费失败导致多次循环进入死信队列。

应该使用一个专门的处理程序用来对这些死信队列的信息进行善后。

Consumer 消费者

顾名思义,Consumer 定义的是一个消费者,他是一个函数式接口,提供了消费消息的方法。我们可以直接在 Bean 声明中使用 lambda 表达式实现它。

值得注意的是,Consumer 还是一个泛型接口,通过泛型来绑定消息的类型。接收消息的类型我们会用到 KStream 类,他将与发送消息时定义的 KStream 对应,是键值对组成的抽象记录流,但相同 key 的记录不会被覆盖。

代码语言:javascript
复制
@Bean
public Consumer<KStream<Object, String>> consumer() {
		return input -> input.foreach((key, value) -> {
				do consume;
		});
}

当我们在应用程序中声明返回 Consumer 的 Bean,那么这个 Bean 就会自动接入消息队列。另外,我们需要用到 spring.cloud.stream.bindings.{beanName}-in-{idx}={topic} 来设置订阅的消息主题。默认情况下,topic 与 beanName 同名。

spring.cloud.stream.bindings.consumer-in-0 = userBuy

当接收到消息时,就会调用 Consumer 定义的 accept 方法进行消息消费。

发送消息 生产者

SCS 并没有对发送消息做出一个具体的封装,而是建议通过各个消息队列支持的 client 或者 template 发送消息。

代码语言:javascript
复制
		kafkaTemplate.send(message);

Function 加工厂

但有时候,我们需要对数据进行加工后发送回消息队列中,这个时候就会用到 Function。

它和 Consumer 类似,但是方法多了一个返回值。同样的,这个返回值需要用到 KStream 类,这样就能够支持将处理完的数据返回到消息队列。

代码语言:javascript
复制
@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
		return input -> input.map((key, value) -> {
			do process;
			return new KeyValue(key, value);
		})
}

spring.cloud.stream.bindings.{beanName}-out-{idx}={topic} 来设置出口的消息主题。默认情况下,topic 与 beanName 同名。

Function 相比生产者或消费者,更像是将消息进行加工,这个过程可以对消息进行一系列的处理,包括消息拆分,消息过滤和计算中间结果等。常见的一个用途就是国际化消息和多平台通知。

国际化消息就是对消息进行本地化,Function 就类似一个翻译官的功能,将翻译好的消息转达给消费者。

有时候我们也需要同时对多个平台推送通知,比如邮件、短信等。一般来说,邮件服务器和短信服务器不会写死消息的模板以提高泛用性,这个时候就需要中间人对消息进行加工,嵌入对应平台的模板。

多输出绑定

上面提到了消息拆分,Function 允许多个 topic 的消息发送,返回值上会用到 KStream 数组,然后配置上会用到方才展示的 spring.cloud.stream.bindings.{beanName}-out-{idx}={topic},idx 代表的就是返回值 KStream 在数组中的索引。

多输入绑定

多输入绑定在普通应用程序上很少用到,一般用于分布式计算。比如除法计算需要同时拥有除数和被除数。分布式计算也是 SCS 的一大用处之一,知识盲区,在此不多做介绍。

KStream

上面多次提到了 KStream,它实质上是一个顺序且可不断增长的数据集,是数据流的一种。

KTable

KTable 与 KStream 类似,但是与 KStream 不同的是,他不允许 key 的重复。 面对相同 key 的数据,会选择更新而不是插入。 KTable 实质上也是数据流,他的实现类同样继承了 AbstractStream。 可以将他看成某一时刻,KStream 的最新快照。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Binder 事务
  • Error Channel
  • Dead-Letter
  • Consumer 消费者
  • 发送消息 生产者
  • Function 加工厂
  • 多输出绑定
  • 多输入绑定
  • KStream
  • KTable
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档