前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >今日榜首|10年高级技术专家用7000字带你详解响应式技术框架

今日榜首|10年高级技术专家用7000字带你详解响应式技术框架

作者头像
愿天堂没有BUG
发布2022-10-28 11:17:13
1.5K0
发布2022-10-28 11:17:13
举报
文章被收录于专栏:愿天堂没有BUG(公众号同名)

响应式技术框架

目前在后端Web编程和微服务编程领域,存在多种响应式编程技术框架。

本篇我们从响应式编程规范开始介绍,进一步加深对响应式编程的理解。

响应式编程规范

对于响应式编程来说,响应式流是一种非阻塞、响应式、异步流处理、支持背压的技术标准,包括运行时环境(JVM和JavaScript)及网络协议。JDK 9发布的Flow API(java.util.concurrent.Flow)和响应式流规范呼应,成为响应式编程事实上的标准。

响应式流规范提供了一组最小化的接口、方法和协议来描述必要的操作和实体对象。

● Publisher:消息发布者。发布者只有一种方法,用来接受订阅者进行订阅(Subscribe)。T代表发布者和订阅者之间传输的数据类型,接口声明如下:

● Subscriber:消息订阅者。当接收到Publisher的数据时,会调用响应的回调方法。注册完成时,首先会调用onSubscribe方法,参数Subscriptions包含了注册信息。订阅者有四种事件方法,分别在开启订阅、接收数据、发生错误和数据传输结束时被调用,接口声明如下:

● Subscription:连接Publisher和Subscriber的消息交互的操作对象。Subscriber可以请求数据(request),或者取消订阅(cancel)。当请求数据时,参数“long n”表示希望接收的数据量,防止Publisher发送过多的数据。一旦开始请求,数据就会在流中传输。每接收一个,就会调用onNext(Tt);当发生错误时,onError(Throwable t)被调用;在传输完成后,onComplete()被调用。接口声明如下:

● Processor:同时充当Subscriber和Publisher的组件。可以看出,Processor接口继承了Subscriber和Publisher,是流的中间环节,接口声明如下:

响应式流中的数据从Publisher开始,经过若干Processor,最终到达Subscriber,即完整的数据管道(Pipeline)。

背压(Back Pressure)

在响应式编程规范中,响应式编程采用异步的发布-订阅模式。数据由Publisher推送消息给Subscriber。这种模式容易产生的问题是,当Publisher即生产者产生的数据速度远远大于Subscriber即消费者的消费速度时,消费者会承受巨大的资源压力(Pressure)而可能崩溃。

为了解决以上问题,数据流的速度需要被控制,即流量控制(Flow Control),以防止快速的数据流压垮目标。因此需要反压,即背压(Back Pressure),生产者和消费者之间需要通过一种背压机制来相互操作。这种背压机制要求是异步非阻塞的,如果是同步阻塞的,则消费者在处理数据时,生产者必须等待,会产生性能问题。

Java Flow API

从Java 9开始,增加了java.util.concurrent.Flow API,实现了响应式流规范(Reactive Stream Specification),并且把响应式流标准的接口集成到了JDK中。它和响应式流标准接口定义完全一致,之前需要通过Maven引用的API,Java 9之后可以直接使用了。响应式流的标准Maven依赖如下:

Java9通过java.util.concurrent.Flow和 java.util.concurrent.SubmissionPublisher实现了响应式流Flow类中定义的四个嵌套的静态接口,用于建立流量控制的组件,Publisher在其中生成一个或多个数据项供Subscriber使用。下面是Java 9 FlowAPI的核心组件。

● java.util.concurrent.Flow:这是Flow API的主要类,该类封装了Flow API的所有重要接口。需要说明的是,这个类声明为final类型,所以我们无法扩展它。

● java.util.concurrent.Flow.Publisher:每个发布者都需要实现此接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。

● java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口,订阅者按照严格的顺序调用方法,此接口有下面四种方法。

○ onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request就开始从处理器(Processor)接收项目。

○ onNext:当发布者收到项目时调用此方法,这是我们实现业务逻辑来处理流并向发布者请求更多数据的方法。

○ onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。

○ onComplete:这就像finally方法,在发布者没有发布其他项目或者发布者关闭时调用。可以用来发送流成功处理的通知。

● java.util.concurrent.Flow.Subscription:用于在发布者和订 阅 者 之 间 创 建 异 步 非 阻 塞 连 接 。 订 阅 者 调 用 请 求(request)方法来向发布者请求项目。它还有取消订阅(cancel)的方法,即关闭发布者和订阅者之间的连接。

● java.util.concurrent.Flow.Processor:此接口同时扩展了Publisher和Subscriber接口,用于在发布者和订阅者之间转换消息。

● java.util.concurrent.SubmissionPublisher : 这 个 类 是 对Publisher接口的实现,它将提交的项目异步发送给当前订阅者,直到它关闭。它使用Executor框架,我们将在响应式流示例中使用该类来添加订阅者,然后向其提交项目。

Java 9 Flow API接入实例

下面使用Java 9 Flow API实现一个简单的发布消息订阅的例子。

1.创建一个Item类,作为创建从发布者到订阅者之间的流消息的对象

2.实现一个帮助类,创建一个Item列表

3.实现消息的订阅

在步骤3中,Subscription变量保持消费者对生产者的引用,通过onNext ( ) 方 法 进 行 请 求 处 理 ;Count 变 量 记 录 请 求 个 数 ; 在onSubscribe方法中调用订阅请求来开始处理;在onError方法和onComplete方法中调用发生错误和完成时执行的业务逻辑。

4.使用主程序测试完成逻辑

在步骤4中,首先使用SubmissionPublisher、TestSubscriber创建发布者和订阅者。通过publisher.subscribe(subs)建立发布者与订阅者之间的关联关系;然后发布者通过submit方法发送消息给订阅者,这个过程是异步执行的;在主线程的while循环中判断Item的size和消费累计的size;当Item全部消费完成时,退出主线程的While循环;最后关闭发布者以免任何内存泄漏。

下面是程序的输出结果:

RxJava响应式框架

RxJava基于ReactiveX(Reactive Extensions的缩写)库和框架,使用观察者模式、迭代器模式及函数式编程,提供了异步数据流处理、非阻塞背压等特性。

Reactive Extensions

这个概念最早出现在微软的.NET社区中,目前越来越多语言实现了自己的响应式扩展,如Java、Javascript、Ruby等。

Reactive Extensions是响应式编程的一种实现,是解决异步事件流的一种方案。通俗地讲,就是利用它可以很好地控制事件流的异步操作,将事件的发生和对事件的响应解耦,让开发者不再关心复杂的线程处理、锁等并发相关问题。

RxJava的接入实例

RxJava 2.x实现了响应式流规范。它是Netflix开发的一个响应式编程框架。下面是RxJava的典型开发代码:

Observable

Observable可以理解为数据的发射器,对应Java Flow的发布者(Publisher)组件,通过create方法生成Observer对象。它会执行相关 业 务 逻 辑 并 通 过 emit 方 法 发 射 数 据 , 传 入 的 参 数 是ObservableOnSubscribe对象,使用泛型T作为操作对象的类型。你可以重写subscribe方法,里面是具体的数据源计划,前面的例子中是发射三个数字:1、2、3。ObservableEmitter是发射器的意思,有三种发 射 数 据 的 方 法 : void onNext( T value ) 、 voidonError(Throwable error)、void onComplete()。onNext方法可以 无 限 调 用 , 观 察 者 ( Observer ) 可 以 接 收 到 所 有 发 布 者(Publisher)发布的数据库,onError和onComplete是互斥的。

Observer

Observer 是 数 据 的 观 察 者 , 对 应 Java Flow 的 订 阅 者(Subscriber)组件,通过new方法创建并重写内部方法,onNext、onError、onComplete都是与被观察者发射的方法一一对应的。在本例中,订阅者的onNext方法处理消费数据逻辑,当收到的数据等于20时,将取消订阅,此时数据的发布者就不再向观察者推送数据。通过dispose方法可以取消Observer和Observable之前的订阅关系。

Scheduler

RxJava支持异步通信的特性是通过Schedulers组件实现的,Scheduler的中文意思是调度器。在RxJava中,可以通过Scheduler来控制调度线程,从Scheduler的源码可以发现它本质上是操纵Runnable对象,支持用立即、延时、周期形式来调度工作线程。RxJava 2.x中内置了多种Scheduler实现,适用于不同场景。这些Scheduler可以在代码中直接使用,屏蔽了开发者对线程调用的管理和控制。在前面的例子中我们使用了Schedulers.io()作为线程调度策略,下表总结的是Schedulers不同的线程调度策略。

Operator

RxJava在处理事件的流转过程中,提供了丰富的操作符,用来改变事件流中的数据。以Map操作符为例,Map的作用是将发射的事件进行Map函数定义的数据转换,再将转换后的事件发射给Observer。转换过程如下图所示。

(1)通过Emitter发射了1、2、3三个数字。

(2)中间通过Map进行转换,转换后事件变成10、20、30。

(3)最后将转换后的事件发射给Observer。

RxJava2-Android-Samples(GitHub开源项目)的Readme.md中总结了RxJava用到的所有操作符,篇幅所限,其他操作符可以从Reactive官方地址获得详解。RxJava的主要操作符如下表所示。

Reactor响应式框架

Reactor是Pivotal基于Reactive Streams规范实现的响应式框架 。 作 为 Spring 的 兄 弟 项 目 , 它 进 一 步 扩 展 了 基 本 的 ReactiveStreams Publisher及Flux和Mono API等组件,主要使用依赖的组件是Reactor Core模块。

Reactor项目已在GitHub中开源(可使用Reactor关键字搜索),主要包含Reactor Core和Reactor Netty两部分。Reactor Core实现了反应式编程的核心功能,Reactor Netty则是Spring WebFlux等技术的基础。

Reactor的接入实例

1.使用Reactor进行响应式编程,加载对应的Maven依赖

2.使用Reactor进行响应式编程的Demo

3.执行上述程序得到如下结果

在Reactor项目中,主要有与RxJava类似的发布者、订阅者、操作符等关键API和语法概念,下面结合代码实例讲解主要用到的模块。

Reactor的核心模块

● Flux

Flux是Reactor中数据发布者的重要抽象类。从源码中可以发现,Flux实现了Reactive Streams JVM API Publisher。Flux定义了0~N的非阻塞序列,类比非阻塞Stream,在Reactor中充当数据发布者的角色。在上述实例中,Flux通过just方法发布数据流。just方法是Flux常见的创建Stream的方法,此外,还可以通过create、generate、from等方法创建Flux数据流。上面例子中使用最简单的just方法完成了三个数字的构造和声明发布,如下图所示。

● Mono

Mono和Flux类似。从源码中可以发现,Mono同样实现了ReactiveStreams JVM API Publisher,实现了0~1的非阻塞结果,如下图所示。

● Subscriber

订阅者通过订阅操作,可以处理数据的请求,在订阅方法中需要重写onSubscribe、onNext、onError、onComplete方法来实现数据流的消费。Flux调用subscribe方法后会触发数据的发送,订阅者接收到数据后会触发onSubscribe方法。onSubscribe表示订阅动作的方式,准备发送给真正的消息接收者,然后执行subscription.request方法发送请求数据。代码例子中request(1)表示只发送一条数据,也可以使用subscription.cancel取消上游数据的传输。然后执行onNext方法进行消息的响应处理,在onNext方法中执行request方法可以把数据交给subscription链,循环处理所有数据。

● Operator

在Reactor项目中,一个Operator会给一个发布者(Publisher)添加某种行为,并返回一个新的Publisher实例。还可以对返回的Publisher再添加Operator连成一个链条。原始数据沿着链条从第一个Publisher开始向下流动,链条中的每个节点都会以某种方式去转换流入的数据。链条的终点是一个订阅者(Subscriber),Subscriber以某种方式消费这些数据,流程图如下图所示。

下面是对Reactor项目中Operator的总结分类,大致可以分为如下几类。

● 集合Operator:提供集合运算,如map、filter、sort、group、reduce等,和Java 8 Stream的中间操作具有相同的效果。

● 异 常 处 理 Operator : 提 供 异 常 处 理 机 制 , 如 retry 、onErrorReturn等。

● 回 调 Operator : 提 供 Publisher 状 态 转 换 时 的 回 调 , 如doOnCancel、doOnRequest等。

● 行为Operator:修改Publisher的默认行为,为其添加更多功能,如buffer、defaultIfEmpty、onBackpressureXXX等。

● 调试Operator:添加调试信息,如log、elapsed等。

Vert.X响应式编程

Vert.X是基于JVM构建的一个Reactive工具箱。同时,Vert.X和Spring类似,也有一套微服务开发生态。从开发者的角度来看,Vert.X就是一些库包,提供了HTTP客户端和服务器、消息服务、TCP和UDP底层协议等模块。你可以使用这些模块来构建自己的应用,也可以通过向Vert.X Core(Vert.X的基础组件)中增加任意模块来构建自己的系统。

Vert.X的主要功能

● Web开发,Vert.X封装了Web开发常用的组件,支持路由、Session管理、模板等。

● TCP/UDP开发,Vert.X底层基于Netty,提供了丰富的I/O类库,支持多种网络应用开发,不需要处理底层细节(如拆包和粘包),注重业务代码编写。

● 提供对WebSocket的支持,可以做网络聊天室、动态推送等。

● Event Bus(事件总线)是Vert.X的神经系统,通过Event Bus可以实现分布式消息、远程方法调用等。正是因为Event Bus的存在,Vert.X才可以更加便捷地开发微服务应用。

● 支 持 主 流 的 数 据 和 消 息 的 访 问 , 如 Redis 、 MongoDB 、RabbitMQ、Kafka等。

● 支持分布式锁、分布式计数器、分布式Map。

Vert.X的特性

● 异步非阻塞:Vert.X就像是跑在JVM上的Node.js(使用事件驱动、非阻塞式I/O模型的JavaScript运行环境),所以Vert.X的第一个优势就是它实现了一个异步的非阻塞框架。

● Vert.X支持多编程语言,在Vert.X上,可以使用JavaScript、Java、Scala、Ruby等语言。

● 不依赖中间件:Vert.X的底层依赖Netty,因此在使用Vert.X构建Web项目时,不依赖中间件。像Node一样,可以直接创建一个HttpServer,相对会更灵活一些,安全性也会更高一些。

● 完善的生态:Vert.X提供数据库操作、Redis操作、Web客户端操作等丰富的组件功能。

Vert.X的接入实例

1.加载对应的Maven依赖

2.Vert.X提供了一个创建HTTP服务器的简单方法,该服务器会在每次接收到HTTP请求时返回一个“Hello”的response

在这个例子里,我们创建了一个requestHandler来接收HTTP请求事件,并且返回响应。在Vert.X中,所有API都不会阻塞调用线程,如果不能立即响应结果,Handler会在事件准备好后处理,通过异步操作回调Handler方法触发执行。这种非阻塞的开发模型,可以使用较少的线程处理高并发场景。下面是Vert.X中EventLoop的工作模型图。

Verticle是Vert.X中的重要组件,可以理解成Java中的Servlet、POJO Bean或Akka中的Actor。一个组件可以有多个实例,Verticle实例之间的通信通过Event Bus实现。

ProducerVerticle负责监听8080端口,接收前端请求,它可以通过Event Bus发送一个事件,该事件将被传递给多个该事件的订阅者,代码如下。

ConsumeVerticle负责消费Event Bus的数据并返回响应,代码如下。

MainApp是启动类,在main方法中发布两个Verticle,下面代码是启动主流程的方法。

浏览器调用接口http://127.0.0.1:8080/book/1,出现下面结果则表示正确。

Verticle具有以下几个特点。

● 每个Verticle都占用一个EventLoop线程,且只对应一个EventLoop。

● 每个Verticle中创建的HttpServer、EventBus等资源都会在回收Verticle时被同步回收。

● 在多个Verticle中创建同样端口的HttpServer,会变成两个EventLoop线程,处理同一个HttpServer的连接,可以利用Verticle的这一特性来提升并发处理性能。

Spring Boot 2响应式编程

Spring Boot 2.x在Spring Boot 1.x基础上,基于Spring 5实现了响应式编程框架。从Spring MVC注解驱动的时代开始,Spring官方有意识地去Servlet化。不过在Spring MVC时代,Spring仍然摆脱不了对 Servlet 容 器 的 依 赖 , 然 而 借 助 响 应 式 编 程 ( ReactiveProgramming)的势头,Spring加速了这一时代的到来。WebFlux将Servlet容器从必须项变为可选项,并且默认采用Netty Web Server作为HTTP容器的处理引擎,形成Spring全新的技术体系,包括数据存储等技术栈。Spring Boot 2官方提供的基于Reactor与Servlet容器生态和技术栈的对比如下图所示。

对比发现,Spring Boot 2.x与Spring Boot 1.x在技术栈上存在巨大差异。Spring Boot 2.x最显著的变化就是采用了响应式的技术体系。底层的Reactive核心组件、响应式WebFlux框架、响应式数据存储、响应式安全、响应式Web服务引擎组成了Spring响应式技术体系。

下面列举了Spring Boot 2中支持响应式编程的部分模块。

Spring Core

Spring Core 是 Spring 的 核 心 模 块 。 Spring Framework 5 基 于ProjectReactor和RxJava反应式项目及响应式编程规范实现了对响应式编程的支持。在Spring Core中通过引入ReactiveAdapter实现了Object和Publisher<T>的相互转换,代码如下:

使用者可以通过继承ReactiveAdapter实现定制化的数据类型转换 。 ReactiveAdapterRegistry 可 以 作 为 对 象 池 来 保 持ReactiveAdapter实例并提供相应的数据访问方式。

响应式I/O

Spring Core提供了对I/O的响应式编程支持。Spring Core首先引入了一个字节缓存抽象接口DataBuffer,提供了一个DataBufferUtils工具类,可以实现以Reactive方式对I/O进行访问和交互。从下面的示例代码可以看到,DataBufferUtils返回了一个Flux对象,这样就可以使用Reactor相关接口读取test.txt文件,实现背压的响应式特性。

同时,Spring Core通过下面接口实现了基于响应式流的编解码实现类,这样可以方便DataBuffer实例与对象的相互转化,代码如下:

Spring WebFlux构建响应式Web服务

在Web服务方面,Spring 2.x提供了WebFlux框架,基于Flux和Mono对象实现响应式非阻塞Web服务。同时提供了一个响应式的HTTPWebClient,它可以通过函数式的方式异步非阻塞地发起HTTP请求并处理响应。Spring WebFlux也提供了响应式的WebSocketClient。下一节我们会详细讲解Spring的WebFlux框架。

数据层支持响应式

开发基于响应式流的应用,就像搭建数据流的管道,使异步数据能够顺畅流过每个环节。大多数系统免不了要与数据库交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。在消息的处理过程中,如果数据管道在任何一个环节发生阻塞,都有可能造成整体吞吐量的下降。

各个数据库都开始陆续推出异步驱动的技术支持,目前可以支持响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。

相关生态的响应式支持

● Spring 5实现了对Spring Security的响应式支持。

● Spring Cloud基于WebFlux框架实现了Spring Cloud Gateway微服务网关。

● Spring Test实现了响应式的支持类WebTestClient。

● 在监控领域,Sleuth也提供对响应式WebFlux的追踪支持。

本文给大家讲解的内容是响应式微服务架构,响应式技术框架

  1. 下篇文章给大家讲解的内容是响应式微服务架构,Spring WebFlux框架
  2. 觉得文章不错的朋友可以转发此文关注小编;
  3. 感谢大家的支持!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 愿天堂没有BUG 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 响应式技术框架
  • 响应式编程规范
  • Java Flow API
  • RxJava响应式框架
  • Vert.X响应式编程
  • Spring Boot 2响应式编程
  • 本文给大家讲解的内容是响应式微服务架构,响应式技术框架
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档