首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

精通springcloud:消息驱动的微服务,了解Spring Cloud Stream

消息驱动的微服务

我们已经讨论了 Spring Cloud 提供的是基于微服务架构的许多功能。但是,我们一直在考虑的其实都是基于 RESTful 的同步通信服务。在“微服务简介”中曾经提到过,还有其他一些流行的通信方式,如发布/订阅或异步、事件驱动的点对点消息传递等,后者就是本章将要介绍的一种微服务实现方法,它和前面章节所介绍的基于 RESTful 的同步通信服务有所不同。

本章还将详细讨论如何使用 SpringCloudStream 来构建消息驱动的微服务。

本章将要讨论的主题包括:

口与 Spring Cloud Stream 相关的主要术语和概念。

口使用 RabbitMQ 和 Apache Kafka 消息代理作为绑定器。

口 Spring Cloud Stream 编程模型。

口绑定、生成器和使用者的高级配置。

口扩展、分组和分区机制的实现。

口多个绑定器支持。

了解 Spring Cloud Stream

Spring Cloud Stream 构建于 Spring Boot 之上。它允许开发人员创建独立的、生产级的 Spring 应用程序,并使用 Spring Integration 来帮助实现与消息代理的通信。使用 SpringCloud Stream 创建的每个应用程序都可以通过输入和输出通道与其他微服务集成。这些通道通过与中间件相关的绑定器( Binder)实现连接到外部消息代理。有两种内置的绑定器实现一 Kafka 和 Rabbit MQ.

Spring. Integration 扩展了 Spring 编程模型,以支持众所周知的企业集成模式(Enterprise Integration Pttens, EIP)。EIP 定义了许多通常用于分布式系统中的协作的组件。读者可能已经听说过消息通道、路由器、聚合器或端点等模式。Spring Integration 框架的主要目标是提供一个基于 EIP 构建 Spring 应用程序的简单模型。如果读者对有关 EIP 的更多详细信息感兴趣,请访问网站 ht:/www enterpriseintegrationpatterms com/patterns/messaging/toc .html.

构建消息传递系统

我们认为引入主要 Spring Cloud Stream 功能的最合适方式是通过基于微服务的示例系统。我们将轻松修改前面章节中讨论过的系统架构。这里不妨简要回顾一下这种架构。我们的系统负责处理订单。它由 4 个独立的微服务组成。order-service 微服务首先与 product-service 服务进行通信,以便收集所选产品的详细信息,然后通过 customer-service 服务来检索有关客户及其账户的信息。现在,发送到 order-service 服务的订单将被异步处理。此时仍有一个公开的 RESTful HTTP API 端点用于客户端提交新订单,但应用程序不会处理它们。它只保存新订单,将其发送到消息代理,然后给客户端发送响应,表示订单已被批准处理。当前讨论的示例的主要目标是显示点对点通信,因而消息将仅由一个应用程序(account-service 服务)接收。图 11.1 说明了这个示例系统的架构。

收到新消息之后,account-service 服务会调用 product-service 公开的方法以查找其价格。它从账户中提取资金,然后将响应发送回 order-service 服务(包含当前订单的状态)。该消息也通过消息代理发送。order-service 微服务将接收消息并更新订单状态。如果外部客户端想要检查当前订单状态,它可以调用公开 find 方法的端点,查找订单的详细信息。该示例应用程序的源代码可在 GitHub ( htp:itb co/inin/sampl-spring- cloud-messaging.git)上获得。

 启用 Spring Cloud Stream

在项目中包含 Spring Cloud Stream 的推荐方法是使用依赖项管理系统。Spring CloudStream 具有与整个 Spring Cloud 框架相关的独立版本列车管理。但是,如果在 dependencyManagement 部分的 Edgware.RELEASE 版本中声明了 spring-cloud-dependencies,那么就不必在 porm.xml 中声明任何其他内容。如果开发人员只想使用 Spring Cloud Stream 项目,则应定义以下部分。

<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-st ream-dependencies</artifactId> <version>Ditmars。SR2</version> <type>pom</type> <scope> import</scope> </dependency> </dependencies> </ dependencyManagement>

下一步是将 spring-cloud-steam 添加到项目依赖项中。此外,建议开发人员至少包含 spring- cloud-sleuth 库,以提供发送消息功能和 traceld,这个 traceld 与通过 Zuul 网关传入 order- service 服务的源请求的 traceld 相同。

<dependency> <groupId>org . springframework. cloud</groupId> <artifact Id>spring-cloud-stream</artifactId> </ dependency> <dependency> <groupId>org. springframework. cloud</groupId> cartifactId>spring-cloud-sleuth</artifactId> </dependency>

要为应用程序启用与消息代理的连接,请使用 @EnableBinding 注解主类。

@EnableBinding 注解可以将一个或多 个接口作为参数。可以在 Spring Cloud Stream 提供

的 3 个接口之间选择。

口 Sink: 用于标记从入站通道接收消息的服务。

口 Source:用于向出站频道发送消息。

口 Processor:可用于需要入站通道和出站通道的情况,因为它扩展了 Source 和 Sink 接口。由于 order-service 服务发送消息以及接收消息,因而其主类已使用 @EnableBinding ( Prossosrcass)进行注解。

以下是支持 Spring Cloud Stream 绑定的主要 order-service 服务类。

@SpringBootApplication @EnableDiscoveryClient @EnableBinding (Processor.class) public class OrderApplication { public static void main(String[] args) { new SpringApplicationBuilder (OrderApplication.class) .web(true) .run (args); } }

声明和绑定频道

由于使用了 Spring Integration,因而该应用程序独立于项目中包含的消息代理实现。Spring Cloud Stream 将自动检测并使用类路径中找到的绑定器,这意味着开发人员可以选择不同类型的中间件,并配合相同的代码使用。所有与中间件相关的设置都可以通过外部配置属性覆盖,并且采用 Spring Boot 支持的形式,如应用程序参数、环境变量或 application.yml 文件。

如前文所述,Spring Cloud Stream 为 Kafka 和 Rabbit MQ 提供了绑定器实现。要包含对 Kafka 的支持,请将以下依赖项添加到项目中。

<dependency> <groupId>org。springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream- kafka</artifactId> </ dependency>

就个人而言,笔者更喜欢 RabbitMQ,本章将为 RabbitMQ 和 Kafka 各创建一个示例。 由于前面的章节已经讨论过 RabbitMQ 的功能,所以现在就先从基于 RabbitMQ 的示例开始。

<dependency> <groupId>org. spr ingf ramework.cloud</groupId> <artifactId> spring-cloud-starter -stream rabbit</artifactId> </dependency>

启用 Spring Cloud Stream 并包含绑定器实现后,即可创建发送消息者(Sender) 和侦听消息者(Listener) 。现在可以从负责向代理发送新订单消息的生产者(Producer) 开始。这是通过 order-service 中的 OrderSender 实现的,它使用 Output bean 发送消息。

@Service public class OrderSender { @Autowired private Source source; public boolean send (Order order) return this.source.output() .send (MessageBuilder .withPayload (order) .build( ) ); } }

该 bean 由控制器调用,它公开允许提交新订单的 HTTP 方法。

@RestController public class OrderController { private static final Logger LOGGER = LoggerFactory.getLogger (OrderController.class); private ObjectMapper mapper new ObjectMapper(); @Autowired OrderRepository repository; @Autowired OrderSender sender ; @PostMapping public order process (0RequestBody order order) throws JsonProcess ingException ( Order O = repository.add(order); LOGGER.info("Order saved: }", mapper .writeValueAsString (order)); boolean isSent 一 sender 。send(o) ; LOGGER. info("order sent:{ } ", mapper.writeValueAsstring (Collections.singletonMap ("issent", isSent) ) ); return o; } }

包含订单信息的消息已发送到消息代理。现在,它应该通过 account-service 服务接收。要完成这一操作, 必须声明接收器,接收器将侦听传入队列的消息,这个消息是在消息代理上创建的。要接收带有订单数据的消息,只需要使用 @Sreaml istener 注解让该方法采用 Order 对象作为参数。

@SpringBootApplication @EnableDiscoveryClient @EnableBinding (Processor.class) public class AccountApplication { @Autowired AccountService service; public static void main(String[] args) { new SpringAppl icationBuilder (AccountApplication.class) .web (true) . run(args); } @Bean astreamListener (Processor。INPUT) public void receiveOrder (Order order) throws JsonProcessingException { service.process (order); } }

现在可以启动示例应用程序。但是,这里还有一个尚未提及的重要细节。这两个应用程序都尝试连接在 localhost 上运行的 RabbitMQ,并且它们都将相同的交换( Exchange )信息视为输入或输出。这是一个问题,因为 order-service 服务将消息发送到输出交换信息,而 account-service 服务又将侦听传入其输入交换消息。这些是不同的交换信息,但先者恒先。接下来不妨就从运行消息代理开始。

本文给大家讲解的内容是消息驱动的微服务

  1. 下篇文章给大家讲解的是自定义与 RabbitMQ 代理的连接;
  2. 觉得文章不错的朋友可以转发此文关注小编,有需要的可以私信小编获取;
  3. 感谢大家的支持!

原文链接:https://www.toutiao.com/a6969440002333884931/?log_from=b4d1a2e3c2e6c_1622784284296

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/48298fd6676af9c52cb8bdc93
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券