响应式编程中 Stream 对象的实现原理

作者:caorich

本文首先简单介绍响应式编程的应用,随之详细阐述如何实现一个轻量的响应式的函数库。

响应式编程

这篇文章介绍一种编程泛型,叫做响应式编程。将响应式称作“编程泛型”可能有些夸大其作用范畴,不过通过引入响应式确实会改变我们对特定问题的思考方法,就像刚接触 redux 带来的函数式编程一样。

响应式和从前听说的“面向事件编程”很像,是针对事件的一种处理办法,且比从前的on\off\emit方法来处理事件,响应式会做得更加的优雅。

响应式编程基于“流(Stream)”这个对象。“流”是一个管道,管道中流淌的是事件携带的数据,我们在这个管道的一个截面监听事件,当该事件流淌通过截面时,触发我们的事件句柄。

无论是异步Ajax的返回、用户UI事件、还是自定义的数据,都可以作为管道数据的来源,利用统一的api进行处理。

来看一看代码吧~

首先引入一个响应式的函数库,我用的most.js,还可以选择:

下面的代码,你将每个一秒接受到一个'hello'

var most = require('most')
most.periodic(1000,'hello')
    .observe(console.info)

下面的代码,你将延时一秒获得鼠标的move信息。

most.fromEvent('mousemove', mainDiv)
    .delay(1000)
    .observe(handler)

利用此代码可以做出一个很有意思的效果:

更多的例子,可以在各开源函数库的examples中查看。这里就不再复述。

下面的文章内容,将讨论如何手动实现一个轻量化的响应式函数库。从设计到代码,都有阐述。

如何构建一个stream

以下内容基于我自己写的一个响应式库:Praan.js(还在开发阶段,目前还未实现全部的接口)。我们从一个简单的例子入手,请看下面的代码:

Praan.periodic(1000, 1)
    .map(data => data + 1)
    .observe(console.info)

Praan的开发过程也是从实现以上这一小段代码入手的。这三行代码每一行分别实现了三个功能:

  • 创建一个新的stream
  • 利用map映射一个新的流
  • 利用observe(或者subscribe)观察流发射出的事件

第三项是最复杂的,下面我们依次讲起。

创建一个stream

要创建一个stream,需要知道stream对象涵盖的方法和属性。这是一个引人深思的问题,设想现实生活中洗手间的一条水管,包含哪些属性呢?

  • 首先是水管,无论是铁质的还是PV的,我们首先需要一个水管容器;
  • 然后是水,水管需要有流淌的实体
  • 最后是水源,不同的水源供水习惯不一样。并且,水源负责供水。

这样,可以粗略的构建个Stream类。

function Stream(source){   //Stream就是水管,只是个容器而已
    this.source = source;
}
function Source(law, water){   //水源
    this.law = law;           // 水源的供水习惯
    this.water = water;       // 水源里的水
}

然后再来看最初代码的第一行:

Praan.periodic(1000, 1)

其实就是调用了StreamSource的构造方法:

Praan.periodic = function(periodic, value){
    return new Stream(new Source(periodic, value))
}
// 利用of方法我们可以写得更加简洁一点
    return Stream.of(Source.of(periodic, value));

很明显,这里的periodic参数就是水源的供水习惯,它是周期性的,一秒供水一次,value参数就是水源的供水,它是一个number值:“1”。

通过map映射一个新的stream

如何映射出一个新的流呢?熟悉数组map函数的同学很快想出了答案:

Stream.prototype.map = function(fn){
    this.source.map(fn);
}
Source.prototype.map = function(fn){
    this.value = fn(this.value);
}

貌似是一个很好的办法,不过,还有一种我们没有碰到的情况,那就是value不存在的情况。例如,有些source是来自于promise,只有当promise.resolve了之后,source才会知道value具体是什么,我们不能在此之前map一个value。另外,函数式编程里面讲究惰性执行,这样做违背了惰性原则。

要解决这个问题很简单,不要去直接map一个value,而是将map的方法放进一个队列里,真正需要它的时候再拿出来执行,而这个队列,我们取名叫做sinks,当然,sinks里面装的,那就是大名鼎鼎的sink了。

function Source(law, water){
    //...
    this.sinks = [];
}
Source.prototype.map = function(fn){
    this.sinks.push(fn);
}

sink这个概念,经常在开源的reactive库里看到,但是又很少看到一个准确的、易于理解的解释。sink大致可以翻译为“槽”,一个sink就是一个事件的加工厂。Stream里面流动的是事件,而当一个事件真正被发出时,这个事件会经过一个个sink,改变了最初的模样,最终到达观察者的手中。这样的描述是不是很像redux中的reducer?state通过一个个reducer,将最终的产物的交给store。sink也一样,一个water经过一个个水槽加工厂,将最终的产物交给observer。其中道理没想象中神秘。

下图展示了一个请求网络资源的sink组合。黄色背景的方框即是一个个sink。

对于sink函数的设计也是需要详细讨论的,最初,每个sink函数都被简单的设计为输入输出的纯函数。当source发出一个data,经过重重sinks,最后输出的结果用一个reduce函数就可以完成:

var finalData = this.sinks.reduce(function(seed, sink){
    return sink(seed);
}, data);

这样设计期初看起来非常优雅,要是所有函数都是没有副作用的纯函数该多好啊~事实上,这种方法我们没法完成异步的操作。例如,需要加入一个delaySink。该sink会hold住data,若干秒之后再将它交给下一个sink。这样的需求纯函数的sink是没办法完成的。所以我参照了redux中间件的代码,如果每一个sink都能取得对下一个sink的引用,那么就可以由sink来控制数据的向下传递时间和方式。redux中间件的设计方法其实也参考了express的中间件,可以去看这里的文章了解中间件的代码实现。

最终,sink函数的接口被设计成这样:

source.map(function(value, time, nextSink, scheduler){
    var nextValue = value;
    // do something will nextValue
    nextSink.event(nextValue);
    // if the call is async, you can also handler it.
    // new Promise(...).then(_=>nextSink.event(_))
});

例如,对于Stream.prototype.map的实现就是:

Stream.prototype.map = function(fn){
    return Stream.of(this.source.map(function(value, time, nextSink, scheduler){
        nextSink.event(fn(value), time, scheduler)
    }));
}

这样就实现了Praan.periodic(1000,1).map(data => data + 1)这样的接口调用。

利用observe观察流发射出的事件

回顾一下,流发射出的事件被描述为Source的water属性。water在流出Source之前会经过sink的层层加工。然后我们如何观察这段水流(事件)呢?

经过的第一和第二步骤,形形色色的stream被创造了出来,但是这个水管中还是没有水在流动,我们还要激活它,激活水的源头。给Source构造函数再加一个sluice(开闸)方法,当调用sluice方法后,这个水源才真正源源不断的输送水流(事件)。

Source.prototype.sluice = function(){
    // 问题来了
}

问题来了,sluice方法里面要怎样实现呢?

注意到,一旦sluice,source就需要按需触发事件,本质上还是需要setTimeout来完成。不同的source需要触发事件的方式不一样,本文的例子,periodic源是需要每隔1秒周期性的发射水流(事件),换做其他流,可能就需要其他的事件发射规律,这就涉及到:

  1. 计算出时间点(timestamp)
  2. 根据timestamp利用setTimeout往js的时间队列里添加方法
  3. 设计一个类来管理1和2

第一点,需要实现一个TaskFlow,负责计算每个task应该在多久之后执行;第二点,我们实现一个Timer,职责很简单,就是根据传入的timestamp往队列里添加方法;第三点,我们实现一个Scheduler,负责管理TaskFlow和Timer。

这三者的实现代码比较繁杂,Praan的这部分代码也是参考mostjs完成的。有兴趣可以去看源码。

Scheduler类有一个类似事件循环的机制,每当有task分配给scheduler,它会执行这个task,然后再在队列里寻找是否还有下一个task。其流程图如下:

有了Scheduler类对于时序的控制,激活一个source就等于往Scheduler里面丢一个task,Scheduler会自动管理好这个task。这个接口调用起来也很简便:

Source.prototype.sluice = function(){
    scheduler.schedule(this.delay, this.periodic,  Task.of(this.value, this.sinks));
}

scheduler每一秒钟会调用task.run一次,task.run会调用第一个sink,然后第一个sink按需调用nextSink方法将数据交给下一个sink。代码的实现请看中间件的实现方法。

最后,既然掌握了激活source的方法,剩下的就是观察source发射的事件了,原理更简单,给source.sinks的最后加入一个观察用的sink即可:

function watch(fn){
    return (value, time, nextSink, scheduler) => fn(value)
}
Stream.prototype.observe = function(fn){
    this.source.map(watch(fn));
    this.source.sluice();
}

这样,除了最复杂的时序控制的Scheduler的代码实现,我们已经阐述了一个Stream的建模方法。可以参考下图,再回到文章看不懂的地方。

原文链接:http://ivweb.io/topic/589604b996a66570c5eb1fd1

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JAVA高级架构

11 个简练的 Java 性能调优技巧

大多数开发者认为性能优化是一个复杂的话题,它需要大量的工作经验和相关知识理论。好吧,这也不完全错。优化一个应用做到性能最优化可能不是件容易的任务,但是这并不意味...

3396
来自专栏向治洪

android性能优化

前言 性能优化本身是一个很大的主题,涵盖程序的方方面面,任何不慎的操作,都有可能对性能造成比较大的影响,要知道程序的性能是可以累加的,多处的性能低下,会影响整体...

1915
来自专栏小鄧子的技术博客专栏

Architecting Android with RxJava

三个月以来,我翻译了一些关于RxJava的文章,说实话这些翻译,真的搞得我很头疼,那么现在是时候回来写点什么了。

691
来自专栏Java架构师历程

安卓 jni 开发错误 UnsatisfiedLinkError: Native method not found

很明显是因为 native 方法找不到,google 下发现该错误出现频率还蛮高的,基本有两种:

1384
来自专栏生信技能树

用GenePred注释文件进行数据分析

编者注:前几天在生信技能树我们发现了一个神奇的帖子(http://www.biotrainee.com/thread-928-1-1.html ), 作者用一种...

44714
来自专栏企鹅号快讯

Bruce.Wang-记一次对JS木马分析

0×00 前言 随着 javascript 这类脚本语言的快速发展,它能展现的内容越来越多样,所运用的功能变得越来越强大。当运用在 web 开发上时,增加了更多...

2046
来自专栏tkokof 的技术,小趣及杂念

移动开发之浅析cocos2d-x的中文支持问题

  题记:这阵子一直在学习cocos2d-x,其跨平台的特性确实让人舒爽,引擎的框架概念也很成熟,虽然相应的第三方工具略显单薄,但也无愧是一件移动开发的利器啊,...

762
来自专栏北京马哥教育

shell十三问,为linux学习打基础(一)

本文整理并转自CU上的帖子[学习共享] shell 十三問?,此贴是2003年发表的,但却是相当不错的linux基础知识汇集贴,原帖主使用的台湾风格,本文加以简...

3364
来自专栏wOw的Android小站

[Objective-C] Block实现回调和简单的学习思考

关于Objective-C的回调,最常见的应该是用delegate代理实现。不过代理的实现比起Block要更基础,就不介绍了,下面总结一下Block回调的实现。

552
来自专栏web前端教室

[先行者周末课程] 日历组件的开发思路讲解&&日历组件在实际工作中的使用方式

各位同学们大家好,今天又到了周日,视频课程的时候。上次咱们讲的是日历组件。 简短的回顾一下上周的内容,免得同学们一时断篇,想不起来身在何方。日历这种东西,初学者...

19410

扫码关注云+社区