首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

响应式编程——Reactor

Reactor介绍

Spring5更新后,其中有一个号称是可以替代SpringMVC的功能——Spring WebFlux,其是一个响应式变成框架。WebFlux是Spring5封装的Reactor框架。Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

环境

Reactor Core 运行于 Java 8 及以上版本。

了解BOM

使用 Reactor 的最简单方式是在你的项目中配置 BOM 以及相关依赖。 注意,当你这样添加依赖的时候,要省略版本()配置,从而自动使用 BOM 中指定的版本。

当然,如果你希望使用某个版本的 artifact,仍然可以指定。甚至完全不使用 BOM,逐个配置 artifact 的版本也是可以的。

Maven配置

Maven 原生支持 BOM。首先,你需要在 pom.xml 内通过添加下边的代码引入 BOM。如果 (dependencyManagement) 已经存在,只需要添加其内容即可。

注意dependencyManagement标签用来补充通常使用的dependencies配置。

然后,在dependencies中添加相关的 reactor 项目,省略,如下:

依赖 Core 库,没有 version 标签,reactor-test提供了对 reactive streams 的单元测试。

Gradle配置

Gradle 没有对 Maven BOM 的支持,但是你可以使用 Spring 的 gradle-dependency-management 插件。

首先,apply 插件。

然后用它引入 BOM

添加依赖

响应式编程

Reactor 是响应式编程范式的实现,总结起来有如下几点:

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用Flow类)。

响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有Iterable-Iterator这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

使用 iterator 是一种“命令式”(imperative)编程范式,即访问元素的方法是Iterable的唯一职责。关键在于,什么时候执行next()获取元素取决于开发者。在响应式流中,相对应的 角色是Publisher-Subscriber,但是当有新的值到来的时候,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。

除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个Publisher可以推送新的值到它的Subscriber(调用onNext方法), 同样也可以推送错误(调用onError方法)和完成(调用onComplete方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:

这种方式非常灵活,无论是有/没有值,还是 n 个值(包括有无限个值的流,比如时钟的持续读秒),都可处理。

那么我们为什么需要这样的异步响应式开发库呢?

阻塞是对资源的浪费

现在应用系统需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。

广义来说我们有两种思路来提升程序性能:

· 并行化(parallelize) :使用更多的线程和硬件资源。

· 基于现有的资源来提高执行效率 。

通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。

更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。

所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。

异步可以解决问题吗?

第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等异步调用返回结果再去处理。

但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:

· 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。

· Futures :异步方法 立即 返回一个 Future,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是立刻可以拿到,而是等实际处理结束才可用。比如, ExecutorService执行 Callable 任务时会返回 Future 对象。

这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。

回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。

考虑这样一种情景:在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):

回调地狱(Callback Hell)的例子

· 基于回调的服务使用一个匿名 Callback 作为参数。后者的两个方法分别在异步执行成功 或异常时被调用。

· 获取到收藏ID的list后调用第一个服务的回调方法 onSuccess。

· 如果 list 为空, 调用 suggestionService。

· 服务 suggestionService 传递 List 给第二个回调。

· 既然是处理 UI,我们需要确保消费代码运行在 UI 线程。

· 使用 Java 8 Stream 来限制建议数量为5,然后在 UI 中显示。

· 在每一层,我们都以同样的方式处理错误:在一个 popup 中显示错误信息。

· 回到收藏 ID 这一层,如果返回 list,我们需要使用 favoriteService 来获取 Favorite 对象。由于只想要5个,因此使用 stream 。

· 再一次回调。这次对每个ID,获取 Favorite 对象在 UI 线程中推送到前端显示。

这里有不少代码,稍微有些难以阅读,并且还有重复代码,我们再来看一下用 Reactor 实现同样功能:

使用 Reactor 实现以上回调方式同样功能的例子

· 我们获取到收藏ID的流

· 我们异步地转换 它们(ID) 为 Favorite 对象(使用 flatMap),现在我们有了 `Favorite`流。

· 一旦 Favorite 为空,切换到 suggestionService。

· 我们只关注流中的最多5个元素。

· 最后,我们希望在 UI 线程中进行处理。

· 通过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe)。

如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。

Reactor 中增加超时控制的例子

· 如果流在超时时限没有发出(emit)任何值,则发出错误(error)。

· 一旦收到错误,交由 cacheService 处理。

· 处理链后边的内容与上例类似。

Futures 比回调要好一点,但即使在 Java 8 引入CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future还有一个问题:当对Future对象最终调用get()方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181028G1DK3A00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券