译:基于Spring Cloud Stream构建和测试 message-driven 微服务

Bryce Canyon National Park

作者: Piotr Mińkowski

译者: helloworldtang

Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案。您可以基于Spring Cloud Netflix库创建同步REST微服务,正如我在之前的一篇文章中所展示的那样 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服务指南。您可以使用Spring WebFlux项目在Netty上创建异步的、响应式的微服务,并将其与一些Spring Cloud库相结合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建响应式微服务。最后,您可以使用Spring Cloud Stream和类似Apache Kafka或RabbitMQ这样的broker来实现基于发布/订阅模型的message-driven微服务。构建微服务的最后一种方法是本文的主要主题。我将向您展示如何在RabbitMQ broker的基础上有效地构建、扩展、运行和测试消息传递微服务。

体系结构

为了演示Spring Cloud Stream的特性,我们将设计一个示例系统,该系统使用发布/订阅模型进行跨服务通信。我们有三个微服务: 、 和 。应用程序 暴露了负责处理发送到我们系统的订单的HTTP endpoint。所有传入的订单都是异步处理的—— 准备并发送消息到RabbitMQ exchange,然后就对调用的客户端进行响应,不需要等到消息被消费后再响应。应用程序的 和 正在侦听进入该RabbitMQ exchange的订单消息。微服务 负责检查客户账户是否有足够的资金来支付该订单需要的金额,如果有就从该账户扣款。微服务 检查是否有足够的库存,并在处理订单后改变可用产品的数量。 和 都通过RabbitMQ exchange(这一次是使用direct exchange的一对一通信)发送带有操作状态的异步响应。微服务 根据接收到的响应消息来更新订单状态,并通过REST endpoint 提供给外部客户端。

如果您觉得我们的示例描述有点难以理解,这里有一个用于澄清的架构图。

启用 Spring Cloud Stream

在项目中使用Spring Cloud Stream的推荐方法是使用依赖管理系统。Spring Cloud Stream有一个与整个Spring Cloud framework相关,并且独立发布的依赖管理。然而,如果我们已经在 版本的 部分声明了 ,就不需要在 中声明任何其他内容。如果您喜欢只使用Spring Cloud Stream项目,那么您应该定义以下部分。

下一步是将 artifact添加到项目依赖项中。我还建议您至少包括 库,以提供作为源请求进入 的发送消息用的 。

Spring Cloud Stream 编程模型

为了使您的应用程序能够连接到一个message broker,请在主类上使用 注解。 注解将一个或多个接口作为参数。您可以在Spring Cloud Stream提供的三个接口之间进行选择:

Sink:这是用来标记从入站通道接收消息的服务。

Source: 这是用来向出站通道发送消息的。

Processor:当你需要一个入站通道和一个出站通道时,它可以被使用,因为它继承了Source and Sink接口。因为 发送消息,并接收它们,它的主类已经使用了 注解。 下面是 项目中启用了Spring Cloud Stream binding的主类。

增加 message broker

在Spring Cloud Stream术语中,负责与特定message broker集成的实现称为binder。默认情况下,Spring Cloud Stream为 Kafka and RabbitMQ提供了binder实现。它能够自动检测和在类路径上查找binder。任何特定于中间件的设置都可以通过Spring Boot支持的外部配置属性来覆盖,譬如应用程序参数、环境变量,或者仅仅是 文件。为了包含对RabbitMQ的支持,RabbitMQ将这篇文章用作message broker,您应该向项目添加以下依赖项。

现在,我们的应用程序需要连接RabbitMQ broker的一个共享实例。这就是为什么我使用RabbitMQ在默认的5672端口上运行Docker镜像。它还可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下启动web仪表板。

我们需要通过设置属性 为Docker机器IP 192.168.99.100 ,来覆盖Spring Boot application的中的默认设置。

实现消息驱动的微服务

Spring Cloud Stream是在Spring Integration项目之上构建的。Spring Integration扩展了Spring编程模型,以支持众所周知的企业集成模式(EIP)。EIP定义了许多在分布式系统中经常使用的经典组件。您可能已经听说过诸如消息通道、路由器、聚合器或endpoints之类的模式。让我们回到上面的例子。让我们从 开始,它负责接收订单,将它们发布在shared topic上,然后从下游服务收集异步响应。下面是@service,它使用 bean来构建消息并将其发布到远程topic。

这个 是由controller调用,controller暴露提交新订单和通过 获得订单状态的HTTP endpoints。

现在,让我们更仔细地看看消费端。来自 的 bean所发送的消息是由 和 接收。为了从 topic exchange中接收消息,我们只需要在入参为Order的方法上添加 注解。我们还必须为监听器定义目标通道——在这种情况下,它是 。譬如:

接收订单由 bean处理。 会根据客户账户上是否有足够的资金来实现订单接受或拒绝订单。验收状态的响应通过 bean调用的输出通道发回 。

最后一步是配置。它是在 中提供的。我们必须正确地定义通道的destination。而 则将 destination分配给输出通道,而 destination则是输入通道, 和 则恰恰相反。这是合乎逻辑的,因为通过其输出destination通过 发送的消息是通过其输入destination接收的服务接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 的配置设置。

这是为 和 提供的配置。

最后,您可以运行上面示例中的微服务。现在,我们只需要运行每个微服务的单个实例。您可以通过运行JUnit测试类 来轻松地生成一些测试请求,这是在我的源代码库中提供的 中提供的。这种情况下很简单。在下一篇文章中,我们将学习更高级的示例,其中包含多个正在运行的消费服务实例。

扩展

为了扩展我们的Spring Cloud Stream应用程序,我们只需要启动每个微服务的附加实例。他们仍然会侦听与当前正在运行的实例相同的 topic exchange 中的传入消息。在添加了一个 和 的实例之后,我们可以发送一个测试订单。这个测试的结果对我们来说是不令人满意的… 为什么?每个微服务运行的所有实例都接收到了这个订单。这正是 topic exchanges 的工作方式——发送到topic的消息被所有的消费者接收,他们正在侦听这个topic。幸运的是,Spring Cloud Stream能够通过提供称为consumer group的解决方案来解决这个问题。它负责保证一个消息只被一个实例处理,如果它们被放置在一个相互竞争的消费者关系中。在运行多项服务实例时,对consumer group机制的转换已经在下图中可视化了。

一个 consumer group 机制的配置不是很困难。我们只需要设定 参数,并给出给定destination的组名。下面是 的当前binding配置。 destination地是一个为直接与 通信而创建的队列,因此只有 被分组使用 属性。

Consumer group机制是Apache Kafka的一个概念,它也在Spring Cloud Stream中实现,也适用于RabbitMQ broker,它本身并不支持它。因此,我认为它在RabbitMQ上的配置非常有趣。如果您在destination运行两个服务实例,而没有在destination设置组名,那么就会有两个为单个交易所创建的bindings(每个实例一个bindings),如下图所示。因为有两个应用程序在这个exchange中监听,总共有四个binding分配给那个exchange。

如果您为选定的destination Spring Cloud Stream设置组名,则将为给定服务的所有运行实例创建单一binding。binding的名称将以组名为后缀。

因为,我们已经在项目依赖项中包含了 ,在实现 POST endpoint的单个请求时,在交换的所有异步请求之间发送相同的 头部。由于这个原因,我们可以使用Elastic Stack (Kibana)轻松地将所有日志关联起来。

自动化测试

您可以轻松地测试您的微服务,而不需要连接到message broker。要实现它,您需要将 包含到您的项目依赖项中。它包含 bean,它允许您与绑定通道进行交互,并检查应用程序发送和接收的任何消息。

在测试类中,我们需要声明 bean,它负责接收由 保留的消息。这是我的 测试类。使用 bean,我将测试订单发送到输入通道。然后, 接收到通过输出通道发送回 的消息。测试方法的 创建了应该被帐户服务接受的顺序,而 方法则设置了过高的订单价格,从而导致拒绝订单。

总结

当您不需要来自API的同步响应时,Message-driven的微服务是一个不错的选择。在本文中,我展示了在您的微服务之间的跨服务通信中发布/订阅模型的示例用例。源代码在GitHub上是常见的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源码maven不能运行,这个项目fork原代码并修复了错误】)。对于使用Spring Cloud Stream库、Apache Kafka的更有趣的例子,您可以参考我的书中第11章,Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180715G0907700?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券