响应式编程和Rxjs库介绍

为什么要用响应式编程?

整合了Promise, 回调, 事件回调,Web Workers,Web Sockets

适用于强数据驱动,异步,动画等应用

什么是响应式编程?

谈响应式编程前,我们先纵向拓展一下。我们写一个程序的时候,我们的思维模式是怎么样的。

这里举一个例子: 假设我们到肯德基买东西吃。买了东西,一般给个收据。我们怎么实现这个收据对象。

设计API

收据对象:可能会有print方法,打印出我们的总消费金额。

计算器:还需要一个计算器。我们的收据对象需要知道多少钱,我们可能需要计算器去算。

实现

一种可能的实现

// 收据对象

class Receipt {

 constructor(calculator) {

     this.calc = calculator;

 }

 print(itemA, itemB) {

     console.log(`total receipt £$`);

 }

}

// 计算器

class Calculator {

 sum(...items) {

     var result = 0;

     for(var i=0; i

             result += items[i];

     }

     return result;

 }

}

// 使用

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator();

const receipt = new Receipt(calc);

receipt.print(pizza, beer);

Q:  宿主对象,这里的收据对象,依赖了计算器,并且需要知道使用对象计算器的API,也就是这里的sum。  这个其实是传统的依赖注入方式: 本质上都是函数参数的形式引入我们依赖的对象。 在javascript里面我们也经常这么干,依赖一个回调函数。

Q: sum方法简单的for循环。这里我们用i精确地记录了每一个步骤。而且使用的时候我们调用print,这种方式是命令式编程。

优化1

// 收据对象

class Receipt {

 constructor() {

 }

 print(total) {

     console.log(`total receipt £$`);

 }

}

// 计算器

class Calculator {

 sum(...items) {

     var result = 0;

     u._$forEach(items, function(i) {

         result += i;

     });

     return result;

 }

}

// 使用

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator();

const receipt = new Receipt();

receipt.print(calc.sum(pizza, beer));

收据对象内部不依赖计算器了,通过外部传递进去; 不关心变量i了,这是一种声明式编程,就是告诉你要做什么,不关系具体怎么做。 题外话:函数式编程是声明式编程的一种,但它不仅仅是声明式,它还有其他概念:比如纯函数(不可变数据结构:path copying; 结构共享),curry化等。代码中应尽量采用函数式的方式编写。

优化2

class Calculator {

 constructor(...items) {

     return Rx.Observable.from(items).reduce((acc, item) => (acc + item));

 }

}

class Receipt {

 constructor(observable$) {

     observable$.subscribe(value => console.log(`total receipt: £$`))

 }

}

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator([pizza, beer]);

const receipt = new Receipt(calc);

以上是采用响应式编程的方式。数据就像流水一样被传递。 任何东西都可以变为流。

原理

迭代器模式+观察者模式的结合,涉及两个主要对象:observable(被观察者:报纸), observer(观察者:订阅者) observable通过调用observer的next,complete,error三个方法通知消息给观察者。

迭代器模式+观察者模式的结合,涉及两个主要对象:

observable(被观察者:报纸), observer(观察者:订阅者)

observable通过调用observer的next,complete,error三个方法通知消息给观察者。

每次你创建一个Observable,相当于创建一份空白报纸。报纸的数据源,就是由传递给Observable的函数提供(如果是通过操作符创建Observable,也可以认为传递进去的参数就是数据源)。

数据源可以被操作符操作,然后数据就像流水一般被传递和处理。

最终流给每一个订阅者。

简单实现:

// Observable

function Observable (observer) {

observer.next('传递数据给观察者');

}

// Observer

var Observer = {

 next(value) {

     alert(`收到$`);

 }

};

// 订阅

Observable(Observer);

实现示例1:参照rxjs的用法

把构造方法里面的抽取一个函数,让外部传进来;观察者通过subscribe方法传入。 然后调用fn,把观察者作为参数。这样消息就传给观察者了。

// Observable

function Observable(fn) {

this.fn = fn;

}

 this.fn(observer);

}

// Observer

var Observer = {

 next(value) {

     alert(`收到$`);

 }

};

new Observable(function(observer2) {

 observer.next('123');

}).subsrcibe(Observer);

实现示例2:操作符of用法简单实现

// Observable

function Observable(fn) {

this.fn = fn;

}

Observable.of = function() {

 var args = [].slice.call(arguments);

 return new Observable(function(observer) {

     u._$forEach(args, function(i) {

         observer.next(i);

     });

 })

}

 this.fn(observer);

}

// Observer

var Observer = {

 next(value) {

     alert(`收到$`);

 }

};

Observable.of(1,2,3).subscribe(Observer);

操作符本质上就是函数,会返回一个新的Observable对象。rxjs5的时候,一般我们引用一个操作符的时候,会引用一个文件,它会把它挂在的原型上,以便链式调用。但rxjs6新增加了一个pipe函数,它接收一系列操作符,可以避免打包的时候额外依赖了一些无用的文件。

注意点:

Q:实现示例1中的observer2是不是传进去的observer?

事实上的实现是另外一个对象,一个代理或者叫安全对象,它也有next等api。为什么要弄一个一个新的? 主要原因有很多: + 我们的observer对象不一定都有next, error, complete方法,当然observer也可能是一个函数。 + 希望调用complete后或者取消订阅后,再调用next什么事也不要发生 + 异常处理

Q:订阅这个过程是有生命周期的,比如订阅开始,取消订阅等等,需要管理这个生命周期。

真实的实现会用一个subscription对象,用来管理生命周期。subscribe方法的返回是一个subscription对象,可以取消订阅。

observable, proxyobserver(observer2),subscription三者的关系:

Q: complete方法和unsubscribe的区别

complete执行后,所有的订阅者都不会接收到消息;unsubscribe只是说你这个观察者不再接手消息了。

Rxjs中还有一个Subject对象,这个对象既是一种Observable,也是一种Observer。

Q: 链式调用是如何实现的

因为操作符会返回一个新的observable对象(见实现示例2),所以链式调用是通过原型链实现的。

Q:Promise和Obserable的区别

Promise只能resolve一个值,但是Observable的subscribe方法接收到多个值。 也就说subscribe函数有点特殊,一般调用一个函数只会有一个输出,但是它有多个。

MVVM框架中如何应用响应式编程

架构和设计的区别:

架构指的是各个元素(model view) 之间是如何交互的;

设计指的是采用某个库,算法,设计模式。

响应式编程是一种设计。如何在我们的MVVM框架中应用我们的这种设计。

1 将事件回调函数转为事件流

var result = document.getElementById('result');

var source = Rx.Observable.fromEvent(document, 'mousemove');

var subscription = source.subscribe(e => result.innerHTML = e.clientX + ', ' + e.clientY);

2 将http请求转为流

var observable1$ = Rx.Observable.create(function(observer) {

 onload: function(data) {

   observer.next(data);

   observer.complete();

 },

 onerror: function(error) {

   observer.error(error);

 }

});

});

var observable2$ = Rx.Observable.create(function(observer) {

request("xxx")

 .then(function(data) {

   observer.next(data);

   observer.complete();

 })

 .catch(function(error) {

   observer.error(error);

 });

});

var subscription = merge(observable1$, observable2$).subscribe(function(

result

) {

console.log(result);

});

可以结合一些操作符,实现多个http请求并行,或者一个请求先执行,然后第二个请求等等。

3 更新视图之前操作数据

4 多个对象之间通过流传递数据

父子组件之间:传统的方式是通过$watch,现在多了一种方式流。 它有一个Backpressure背压概念:比如我们父组件每1s产生一个数据,子组件每1s就要更新一次UI,如果我们不做任何控制的话。但是流可以控制这个东西,再父组件控制下一下,每隔5s返回一下数据。debounce or throttle or sampleTime。另外就是不需要的数据能过滤一下。

5 promise方式改为observable方式 promise方式:

function _$isWechatMiniProgram(cb1, cb2) {

 if (wx && wx.miniProgram && wx.miniProgram.getEnv) {

     new Promise(function(resolve, reject) {

         wx.miniProgram.getEnv(function(res) {

             if (res.miniprogram) {

                 resolve();

             } else {

                 reject();

             }

         })

     }._$bind(this)).then(function() {

         cb1 && cb1();

     }).catch(function(e) {

         cb2 && cb2();

     });

 } else {

     cb2 && cb2();

 }

}

_$isWechatMiniProgram(function(){}, function(){})

observable方式:

function _$isWechatMiniProgram() {

 if (wx && wx.miniProgram && wx.miniProgram.getEnv) {

     return Rx.Observable.create(function(observer) {

         wx.miniProgram.getEnv(function(res) {

             observer.next(res.miniprogram);

         });

     })

 }

}

_$isWechatMiniProgram().subscribe(functon(isMiniProgram) {

 if(isMiniProgram) {

     cb1();

 }else {

     cb2();

 }

});

还有一种方式,通过forEach操作符:

function getData() {

return Rx.Observable.create(function() {

     fetch()

});

}

async function execute() {

await getData().forEach(v => console.log(v));

console.log('finish');

}

execute();

现有的实现框架

rxjs

实现

请参看我的一个实现XingMXTeam/sinnple-rx

关注知乎账户:毛毛星 获取更多内容

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181109G23KTC00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券