| 导语 反应式编程是在命令式编程、面向对象编程之后出现的一种新的编程模型,是一种以优雅的方式,通过异步和数据流来构建事务关系的编程模型。本文包括反应式编程的概述和 RxPy 实战,以及怎样去理解反应式编程才能更好的把它融入到我们的编程工作中,把反应式编程变成我们手中的利器。
1. 反应式编程概述
在 google 趋势中搜索反应式编程,可以看到其趋势在 2013 年后一直是往上走的。如图1所示:
[ 图1 google 趋势搜索结果 ]
为啥呢?为啥是 2013 年才有明显的变化,因为2013 年后才有可以大范围使用的框架和库出现,才有人专门投入去布道反应式编程这个事情。
在范围缩小到中国,这个结果有点意思了,如图 2 所示:
[ 图2 google趋势搜索结果 ]
在中国主要是北上广深和杭州,说明什么,这些技术还是一线城市的开发同学才会使用,查看左下角主要是主题都是java相关,查看右上角,浙江省用得比较多,说明阿里是主要的使用方。
反应式编程又叫响应式编程,在维基百科中,其属于声明式编程,数据流。
其定义为:
反应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 (propagation of change) 的声明式 (declarative) 的编程范式。
换句话说:使用异步数据流进行编程,这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
反应式编程提高了代码的抽象级别,可以只关注定义了业务逻辑的那些相互依赖的事件。
反应式编程最着名的实现是 ReactiveX,其为 Reactive Extensions 的缩写,一般简写为 Rx ,发展历程如图 3 所示:
[ 图3 Rx来历 ]
微软 2009 年 以 .Net 的一个响应式扩展的方式创造了Rx,其借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。2012 年 Netflix 为了应对不断增长的业务需求开始将 .NET Rx 迁移到 JVM 上面。并于 2013 年 2 月份正式向外发布了 RxJava 。
在 2014 年 9 月 16 号,反应式宣言正式发布了 2.0 版本。在 2.0 之前,这份宣言的中文翻译标题,实际上是”响应式宣言“,而非”反应式宣言“
在反应式宣言中的 ”Reactive“ 实际上是指一个副词,表示系统总是会积极主动、甚至是智能地对内外的变化做出反应。所以这里叫反应式编程会更贴切一些.
反应式宣言是一份构建现代云扩展架构的参考方案框架。这个框架主要使用消息驱动的方法来构建系统,在形式上可以达到弹性和回弹性,最后可以产生即时响应性的价值。如图 4 所示:
[ 图4 反应式编程 ]
反应式系统具有如图所示的4个特性:
前三种特性(即时响应性, 回弹性, 弹性)更多的是跟你的架构选型有关,我们可以很容易理解像 Microservices、Docker 和 K8s 这样的技术对建立反应式系统的重要性。
这里要特别要提一下回压(Backpressure), Backpressure 其实是一种现象,在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。这句话的重点不在于”上游生产速度大于下游消费速度”,而在于”Buffer 溢出”。回压和 Buffer 是一对相生共存的概念,只有设置了 Buffer,才有回压出现;只要设置了 Buffer,一定存在出现回压的风险。
比如我们开发一个后端服务,有一个 Socket 不断地接收来自用户的请求来把用户需要的数据返回给用户。我们服务所能承受的同时访问用户数是有上限的,假设最多只能承受 10000 的并发,再多的话服务器就有当掉的风险了。对于超过 10000 的用户,程序会直接丢弃。那么对于这个案例 10000 就是我们设置的 Buffer,当超过 10000 的请求产生时,就造成了回压的产生;而我们程序的丢弃行为,就是对于回压的处理。
对于回压我们一般有两种处理方式,一种就是上面举例中的拒绝或丢弃,这是否定应答的方式,另一种是肯定应答,先收下来,然后再慢慢处理。
[图5 适用场景 ]
Rx 适用于前端,跨平台,后端等场景,其中在Angular 2.x,vue,react版本中已经有了Rx的实现可以使用,并且作为其核心的特性在宣传;Rx支持多达18种语言,在各平台都可以使用,具有很强的跨平台特性;在后端,通过异步调用,简单的并发实现,可以实现松耦合的架构。
18种语言Rx系统的框架出现比较早,已经发布了v2版本了,Rx* 系列语言支持如下:
Java: RxJava JavaScript: RxJS C#: Rx.NET C#(Unity): UniRx Scala: RxScala Clojure: RxClojure C++: RxCpp Lua: RxLua Ruby: Rx.rb Python: RxPY Go: RxGo Groovy: RxGroovy JRuby: RxJRuby Kotlin: RxKotlin Swift: RxSwift PHP: RxPHP Elixir: reaxive Dart: RxDart
框架支持:
RxCocoa: RxCocoa是RxSwift的一部分,主要是UI相关的Rx封装 RxAndroid: RxAndroid 源于RxJava,是一个实现异步操作的库,具有简洁的链式代码,提供强大的数据变换。 RxNetty: RxNetty 是一个响应式、实时、非阻塞的网络编程库,基于 Netty 这个著名的事件驱动网络库的强大功能。支持Tcp/Udp/Http/Https。支持>RxJava。RxNetty 在 NetFlix公司的各种产品中得到了广泛的应用。 Reactor: Reactor相对出生较晚,有发展前景Akka,scala系,用户基础薄弱
[ 图6 哪些公司在用Rx ]
2. RxRy入门
Rx的组成包括5部分,被观察者或者叫发射源,观察者/订阅者或者叫接收源,订阅,调度器,操作符。
我们经常用如图7所示的示例图来表示数据流动的过程。
[ 图7 ]
图中上面这条线表示被观察者的时间线,表示输入,从左到右输入项,中间的各种颜色的块块是我们要观察的项,最后的竖线表示输入结束。
Flip是变换过程,对所有的项做变换。下面这条线是变换的结果,也就是输出,同样各种颜色的块块是要观察的结果的项,xx表示异常中断。
需求如下:
从输入框获取输入,从第 10 次输入开始取前5次的输入,打印出来。
这是一个命令式编程的示例,我们需要将需求转换成命令式的描述,引入了计数变量,通过计数变量来跳过输入,然后再根据计算变量来标记取数的次数,打印出来,代码如图8所示:
[ 图8 ]
换成反应式编程,代码如图 9 所示:
[ 图9]
这是一个反应式的面向数据流的示例,创建流,跳过前 10 个项,取前5次,打印出来。如图 10 所示为其数据流动示例。
[ 图10 ]
图片来源:
https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava
对比命令式编程和反应式编程,区别如下:
Rx主要是做三件事:
下面我们以文档+代码的方式介绍这三件事情。
RxPy 有 10 种用于创建 Observable 的操作符,如下:
create 从头创建一个 Observable ,在 observer 方法中检查订阅状态,以便及时停止发射数据或者运算。
observer 包含三个基本函数:
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。如果在队列中调用了其中一个,就不应该再调用另一个。
示例代码见附件
变换常见的操作符有 6 个:
其中 flat_map 和 map 是两个非常重要的操作符,map 的操作很简单,就是传入一个函数,这个函数会将数据进行转换,一个输入对应一个输出
flat_map 和 map 不同,其返回值是一个 Observable,一个输入对应多个输出。
这两个操作的使用场景很好区分,当转换过程是同步过程时,使用 map,当转换过程是异步过程时使用 flat_map。
Group by 在工作中操作数据库的时候经常用到,就是按某个字段分组,在这里也是相同的意思,会按传递的函数生成的key来分组,注意这里的返回是一个分组的Observable,不能直接订阅,需要再做一次处理。
示例代码见附件
过滤用于从 Observable 发射的数据中进行选择,其常见操作符如下:
其中最常用的是 filter,filter 就是过滤,对于数据流,仅发射通过检测的项,有点像 SQL 中的 where 条件,只是这里的条件是一个函数,他会遍历一个个项,并执行这个函数,看是否满足条件,对于 满足条件的才会给到输出流。
示例代码见附件
合并操作符或者叫组合操作符,其常见如下:
其中 merge 和 concat 都是合并流,区别在于一个是连接,一个是合并,连接的时候是一个流接另一个流,合并的流是无序的,原来两个流的元素交错,当其中一个结束时,另一个就算是没有结束整个合并过程也会中断。
示例代码见附件
这些操作符可用于单个或多个数据项,也可用于 Observable。其常见如下:
示例代码见附件
3. RxPy实战
需求描述:
从文件中读取所有QQ号,并对QQ号去重统计
代码如下:
如果文件中有多列,或者是某些字符间隔,在返回的时候再多加一个map,做一次拆分即可,不用再写循环处理,更直接。这里和前面示例不同在于有一个publish。publish 将一个普通的 Observable 转换为可连接的,可连接的Observable 和普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了 Connect 操作符时才会开始,这样可以更灵活的控制发射数据的时机。比如我们这里需要有多个观察者订阅的时候。
需求描述:
获取新浪的美股接口数据,并打印出股票名和价格
代码如下:
需求描述:
从MySQL数据库中读取用户信息并打印出来
代码如下:
需求描述:
将文章信息列表关联作者名称
代码如下:
需求描述:
以多线程的方式,按列表读取新浪接口美股的数据
代码如下:
4. 小结
5. 写在最后
反应式编程已经在淘宝有一些应用,比如在淘宝的猜你喜欢,我的淘宝,都已经实践,其QPS,RT都有较大优化效率,这些点的应用需要对整个业务框架做一次升级 ,主要包括编程框架、中间件,以及业务方的升级等。
其中中间件的升级,包括服务框架(RPC)、网关、缓存、消息(MQ)、DB(JDBC)、限流组件、分布式跟踪系统、移动端 Rx 框架等等。这是一个很大的升级。而反应式架构在各个模块上基本都有成熟的方案,除了个别领域如数据库,基本没有特别的瓶颈。
学习反应式编程主要在于思维转换,因为之前主要使用同步式命令式编程的思维写程序,突然要换成以流的方式编写,思维必须要做转换,比如如何通过使用类似匹配、过滤和组合等转换函数构建集合,如何使用功能组成转换集合等等,当思维转变后,一切都会变得非常自然和顺滑。
这篇文章从网上找了很多的资料,面网上的资料非常有限,特别是RxPy的,基本只有官方的说明文档。
谨以此抛砖,希望有更多的同学可以了解多一种编程范式,把它融入到我们的编程工作中,把反应式编程变成我们手中的利器。
6. 参考资料
Rx(Reactive eXtension)官网 http://reactivex.io/
https://zhuanlan.zhihu.com/p/27678951
https://www.jianshu.com/p/757393ee4a2f
https://blog.csdn.net/maplejaw_/article/details/52396175
《维基:响应式编程》
《响应式架构与 RxJava 在有赞零售的实践》
《全面异步化:淘宝反应式架构升级探索》