首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

corda ledger系列教5 观察者模式、订阅-发布模式和响应式编程

这篇教程相对比较独立,主要目的是给大家一点时间去消化上一篇corda ledger系列教程4 节点服务(上)里面的设计思想和业务架构,大家一定要动手去看看源代码,没有头绪的可以从我的第一篇教程看起,后台留言或者在 corda ledger 技术群提问,同时也为我们接下来的源码课程打下基础,消除一些技术障碍。

大家知道响应式编程最近比较火热,无论在后端、微服务还是前端框架,都大面积的使用,用 ReactiveX 官网话来说:ReactiveX is more than an API, it's an idea and a breakthrough in programming. 而在我们的类区块链系统 corda ledger 中,跨事务、跨JVM虚拟机的记账操作也免不了采用响应式设计。所以今天穿插的为大家讲解一下响应式编程,思路基本上是从观察者模式讲起,然后介绍观察者模式的演进版本发布-订阅模式,最后引入响应式编程,目的是给大家自主深入学习提供一点思路,也为了后面能更加顺畅的理解相关源码丰富技术背景。

一、观察者模式

观察者模式是著名的GoF中的二十三个设计模式之一,它描述了如何解决重复出现的设计问题,以设计灵活且可重用的面向对象软件,使得对象更容易实现,更改,测试和重用。 翻译自维基百科(The Observer design pattern is one of the twenty-three well-known "Gang of Four" design patterns that describe how to solve recurring design problems to design flexible and reusable object-oriented software, that is, objects that are easier to implement, change, test, and reuse)

解决的问题:

对象之间的依赖关系(尤其是一对多的关系),避免出现紧耦合

确保一个对象的状态发生变更后,其他依赖于此的对象可以自动的更新此状态

确保一个对象拥有通知其他对象的能力

如何解决:

定义Subject和Observer两个对象

当Subject改变其状态对象State时,所有已经注册的Observer对象都会被通知,并且自动更新

职责:

Subject对象的职责是维护一个包含Observer的列表,同时在状态对象State发生变化之后,通过调用Observer对象的update() 方法通知它们

Observer对象的职责是将自己注册或者注销于Subject对象(通过调用Subject的attach()和detach()方法)

不足:可以看到,在观察模式的最基本实现中,Observer需要显式的将自己注册或注销于Subject,这里会有一个叫做失效监听者的问题(lapsed listener problem),甚至会有内存泄漏的危险:由于对象Subject显式的持有一个指向Observer的指针,那么一旦Observer在不再需要监听的时候注销失败,那么它将永远不会被GC系统回收掉。

演进:如上所述,由于Observer和Subject的紧耦合,除了会有内存泄漏的危险,在可扩展性、速度、信息恢复和可维护性方面都有诸多限制,因此观察者模式演进成订阅-发布模式(publish-subscribe pattern),即在Observer和Subject之间引入消息队列、消息处理对象、阶段等概念来充分解耦。

二、发布-订阅模式

发布-订阅模式是消息模式(messaging pattern,也叫事件驱动模式)的一种,同时也是我们通常意义上的消息队列范式的原型。消息的发送方通常称为发布者(Publisher),消息的接受方称为订阅者(Subscriber),消息实体会有多种类别(Topic)。发布者和订阅者并不直接交互,甚至互相都不感知,它们只是单纯的生产或者消费相应类别的消息实体,中间的处理(路由和分发)由消息处理对象完成,从而达到充分解耦和横向扩展等目的。

三、响应式编程

在介绍具体的响应式编程的概念之前,我们先类比思考一下什么是“响应式”,了解交互设计或者前端开发的同学一定对一个词特别敏感:自适应,意思是我们开发的网页,不管是在电脑屏幕上,还是在手机浏览器,甚至在用户实时拖动浏览器窗口大小时,都能完美无缺的显示在屏幕上。言下之意,网页的布局会随着浏览器行为的改变而改变,这就是很典型的响应式设计。

如何实现?不妨套用一下上文描述的观察者模式,让页面布局(Observer,也叫Subscriber)去订阅(subscribe)浏览器 (Subject,也叫Obserable)的某一个状态,比如此处的窗口大小,一旦该状态发生变化,浏览器便会通知网页布局,后者便针对新的窗口大小做出改变。

回到代码层面上,响应式编程实际上属于异步编程的范畴,Observer 完成对 Obserable 的订阅动作后并不会阻塞的去等待后者的状态变化,而是预留一些预处理方法来接收后来异步传输过来的信息,我们通常把这些预留的方法称为回调函数。描述至此,我们很轻易的能总结出响应式编程的两个优势:

可以并发的去执行相互独立的计算任务,提高效率

提高了代码的抽象程度,增加可维护性

接下来我们响应式编程框架 rxkotlin来具体阐述,它是众多实现了响应式编程设计思想的框架之一:

虽然 corda ledger 使用的是rxjava,但是原理相通,并且 rxkotlin 是基于 rxjava 实现(为其添加了一些便利的扩展方法),我们来运行一下官方提供的代码片段来直观的感受一下用 rxkotlin 进行响应式编程的魅力:

运行结果如下:

上面的代码可能对初学者不太友好,下面我们一步一步来操作:

一、创建 observer 对象,即观察者对象

这里注意几点:

= object 是 kotlin 中直接定义对象的便捷写法,省去了定义类再实例化的过程

Observer 是RxJava 内置接口,这里我们 override 了它的四个方法,分别在通知完成、注册、新通知来临、Obserable发生错误时被回调

二、创建 Observable 对象,并创建三个事件(回想一下上面描述的订阅-发布模式)

rxjava 中的操作符很丰富,这里使用的 create 便是其中之一,更为详细的操作符描述请见 http://reactivex.io/documentation/operators.html,后面有机会我会详细出一篇教程讲解

三、订阅

查看运行结果:

基本脉络很清晰,首先是创建订阅者和被订阅者,订阅动作发生之后,后面的代码执行完全是异步的事件驱动的。

最后跟大家探讨一下 rxjava 中一个非常重要的问题:线程调度,我们来改写一下之前的代码,让 observer 和 observable 的各个方法都打印一下他们所在的线程,同时我们让 observable 阶段性的产生一些事件:

可以看到,observer 大概每隔一秒钟生成一个事件,发送一个 float 类型的参数,同时打印出自己当前线程号;另一方面 observer 的几个回调方法也有打印了它们的线程号,来看看运行的结果:

也就是说,默认情况下 rx 是单线程的,无论是 observable 还是 observer 的方法都会在 subscribe 动作发生的那个线程执行,这往往不可接受,一旦 observer 进行了某些耗时操作阻塞了当前线程,后果很严重,来看下面的代码:

我们在 observer 的 onNext() 方法写了个死循环,可以看到 observable 就再也不能生成事件了:

幸运的是 rxjava 为我们提供了subscribeOn 和 observeOn 分别用来控制 subscription 的调用线程和 接受事件通知(observer 的 onNext/onError/onCompleted 函数)的线程。同时,rxjava 引入了 Schedulers 对象以简化我们对线程的控制。

再改一下我们代码,测试一下 subscribeOn 和 observeOn 的用途:

我们把主线程的 id,生成事件线程的 id 以及接收事件通知的线程 id 都打印了出来,同时阻塞主线程5秒钟:

显然,生成事件和接收事件两个动作处于不同的线程之中。我们再来对代码做一点有趣的改动,增加多个 observeOn 函数:

强调一下,这里的 map 隶属于抽象类 Observable ,和 kotlin 集合方法 map 方法类似,接收一个函数作为参数,将自己转换成另一个 Observable 对象(由于篇幅的原因我没办法详细讲解 rxjava 的操作符部分,但实际上这部分对我们利用 rxjava 或者 rxkotlin 写出优雅的代码非常有帮助),同时我们也打印出了每个 map 方法所在的线程号,结果如下:

可以看到在遇到 observeOn 之前,所有的 map 操作发生在一个线程,之后在另外一个线程。利用这个特性,我们可以在 Rx 数据流中不同地方设置不同的线程,看一下官方文档给的一个示意图和相关的解释:

As shown in this illustration, theSubscribeOnoperator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called.ObserveOn, on the other hand, affects the thread that the Observable will usebelowwhere that operator appears. For this reason, you may callObserveOnmultiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

最后总结一下 rxjava 内置的 Schedulers 对象:

ImmediateScheduler 并没有做任何线程调度。只是同步的执行任务。嵌套调用会导致任务被递归执行

TrampolineScheduler也是同步执行,但是不嵌套任务。而是把后来的任务添加到任务队列中,等前面的任务执行完了 再执行后面的

NewThreadScheduler 给每个任务创建一个新的线程。

ComputationScheduler计算线程,用于需要大量 CPU 计算的任务

IOScheduler用于执行 io 操作密集的任务

再次说明,响应式编程的内容非常丰富,这里只是给大家一个入门的思路,更加详细的资料请大家浏览http://reactivex.io/,后面的源码教程中涉及到具体的细节,仍然会和大家一起学习。从第一篇教程就跟大家分享过,区块链其实是一门组合技术,corda ledger涉及到的技术范畴也非常丰富,可以这么说,如果能把corda ledger这套系统理解透,不仅仅对学习其他区块链系统有帮助,对其他IT技术的学习也大有裨益!所以希望大家认真对待。有任何疑惑或者指正的地方欢迎大家与我们交流:)

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180517G1J02F00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券