RxJS Observable

在介绍 Observable 之前,我们要先了解两个设计模式:

  • Observer Pattern ——(观察者模式)
  • Iterator Pattern ——(迭代器模式)

这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。

Observer Pattern

观察者模式定义

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

  • 期刊出版方 - 负责期刊的出版和发行工作
  • 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。

观察者模式优缺点

观察者模式的优点:

  • 支持简单的广播通信,自动通知所有已经订阅过的对象
  • 目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用

观察者模式的缺点:

  • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
  • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃

观察者模式的应用

在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:

<button id="btn">确认</button>

function clickHandler(event) {
	console.log('用户已点击确认按钮!');
}
document.getElementById("btn").addEventListener('click', clickHandler);

上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler 函数。

观察者模式实战

Subject 类定义:

class Subject {
    
    constructor() {
        this.observerCollection = [];
    }
    
    registerObserver(observer) {
        this.observerCollection.push(observer);
    }
    
    unregisterObserver(observer) {
        let index = this.observerCollection.indexOf(observer);
        if(index >= 0) this.observerCollection.splice(index, 1);
    }
    
    notifyObservers() {
        this.observerCollection.forEach((observer)=>observer.notify());
    }
}

Observer 类定义:

class Observer {
    
    constructor(name) {
        this.name = name;
    }
    
    notify() {
        console.log(`${this.name} has been notified.`);
    }
}

使用示例:

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'

subject.registerObserver(observer1); // 注册观察者A
subject.registerObserver(observer2); // 注册观察者B
 
subject.notifyObservers(); // 通知观察者

subject.unregisterObserver(observer1); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified. # 输出一次
2(unknown) lolo has been notified. # 输出两次

需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)

Iterator Pattern

迭代器模式定义

迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。

迭代器模式的优缺点

迭代器模式的优点:

  • 简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
  • 封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。

迭代器模式的缺点:

  • 遍历过程是一个单向且不可逆的遍历

ECMAScript 迭代器

在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,就是 Iterator 在没有元素之后,执行 next会直接抛出错误;但后来经过一段时间讨论后,决定采更 functional 的做法,改成在取得最后一个元素之后执行 next 永远都回传 { done: true, value: undefined }

一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 donevalue 两个属性的对象。对象的取值如下:

  • 在最后一个元素前:{ done: false, value: elementValue }
  • 在最后一个元素后:{ done: true, value: undefined }

详细信息可以参考 - 可迭代协议和迭代器协议

ES 5 迭代器

接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:

function makeIterator(array){
    var nextIndex = 0;
    
    return {
       next: function(){
           return nextIndex < array.length ?
               {value: array[nextIndex++], done: false} :
               {done: true};
       }
    }
}

一旦初始化, next() 方法可以用来依次访问可迭代对象中的元素:

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done);  // true

ES 6 迭代器

在 ES 6 中我们可以通过 Symbol.iterator 来创建可迭代对象的内部迭代器,具体示例如下:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();

调用 next() 方法来获取数组中的元素:

> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }

ES 6 中可迭代的对象:

  • Arrays
  • Strings
  • Maps
  • Sets
  • DOM data structures (work in progress)

Observable

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

  • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 通过回调 next 方法向 Observer 发布事件。

自定义 Observable

如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer 作为参数然后返回另一个函数。

它的基本特征:

  • 是一个函数
  • 接受一个 Observer 对象 (包含 next、error、complete 方法的对象) 作为参数
  • 返回一个 unsubscribe 函数,用于取消订阅

它的作用:

作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:

DataSource - 数据源

class DataSource {
  constructor() {
    let i = 0;
    this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
  }
  
  emit(n) {
    const limit = 10;  // 设置数据上限值
    if (this.ondata) {
      this.ondata(n);
    }
    if (n === limit) {
      if (this.oncomplete) {
        this.oncomplete();
      }
      this.destroy();
    }
  }
  
  destroy() { // 清除定时器
    clearInterval(this._id);
  }
}

myObservable

function myObservable(observer) {
    let datasource = new DataSource(); // 创建数据源
    datasource.ondata = (e) => observer.next(e); // 处理数据流
    datasource.onerror = (err) => observer.error(err); // 处理异常
    datasource.oncomplete = () => observer.complete(); // 处理数据流终止
    return () => { // 返回一个函数用于,销毁数据源
        datasource.destroy();
    };
}

使用示例:

const unsub = myObservable({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
});

/**
* 移除注释,可以测试取消订阅
*/
// setTimeout(unsub, 500);

具体运行结果,可以查看线上示例

SafeObserver - 更好的 Observer

上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

  • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
  • complete 或者 error 触发之后再调用 next 方法是没用的
  • 调用 unsubscribe 方法后,任何方法都不能再被调用了
  • completeerror 触发后,unsubscribe 也会自动调用
  • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
  • nextcompleteerror是可选的。按需处理即可,不必全部处理

为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。SafeObserver 的具体实现如下:

class SafeObserver {
  constructor(destination) {
    this.destination = destination;
  }
  
  next(value) {
    // 尚未取消订阅,且包含next方法
    if (!this.isUnsubscribed && this.destination.next) {
      try {
        this.destination.next(value);
      } catch (err) {
        // 出现异常时,取消订阅释放资源,再抛出异常
        this.unsubscribe();
        throw err;
      }
    }
  }
  
  error(err) {
    // 尚未取消订阅,且包含error方法
    if (!this.isUnsubscribed && this.destination.error) {
      try {
        this.destination.error(err);
      } catch (e2) {
        // 出现异常时,取消订阅释放资源,再抛出异常
        this.unsubscribe();
        throw e2;
      }
      this.unsubscribe();
    }
  }

  complete() {
    // 尚未取消订阅,且包含complete方法
    if (!this.isUnsubscribed && this.destination.complete) {
      try {
        this.destination.complete();
      } catch (err) {
        // 出现异常时,取消订阅释放资源,再抛出异常
        this.unsubscribe();
        throw err;
      }
      this.unsubscribe();
    }
  }
  
  unsubscribe() { // 用于取消订阅
    this.isUnsubscribed = true;
    if (this.unsub) {
      this.unsub();
    }
  }
}

myObservable - 使用 SafeObserver

function myObservable(observer) {
  const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象
  const datasource = new DataSource(); // 创建数据源
  datasource.ondata = (e) => safeObserver.next(e);
  datasource.onerror = (err) => safeObserver.error(err);
  datasource.oncomplete = () => safeObserver.complete();

  safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法
    datasource.destroy();
  };
  // 绑定this上下文,并返回unsubscribe方法
  return safeObserver.unsubscribe.bind(safeObserver); 
}

使用示例:

const unsub = myObservable({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
});

具体运行结果,可以查看线上示例

Operators - 也是函数

Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符:

Observable 实现:

class Observable {
  constructor(_subscribe) {
    this._subscribe = _subscribe;
  }
  
  subscribe(observer) {
    const safeObserver = new SafeObserver(observer);
    safeObserver.unsub = this._subscribe(safeObserver);
    return safeObserver.unsubscribe.bind(safeObserver);
  }
}

map 操作符实现:

function map(source, project) {
  return new Observable((observer) => {
    const mapObserver = {
      next: (x) => observer.next(project(x)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}

具体运行结果,可以查看线上示例

改进 Observable - 支持 Operator 链式调用

如果把 Operator 都写成如上那种独立的函数,我们链式代码会逐渐变丑:

map(map(myObservable, (x) => x + 1), (x) => x + 2);

对于上面的代码,想象一下有 5、6 个嵌套着的 Operator,再加上更多、更复杂的参数,基本上就没法儿看了。

你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的Operator并生成一个最终的Observable,不过这意味着要写更复杂的 Operator,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:

pipe(myObservable, map(x => x + 1), map(x => x + 2));

理想情况下,我们想将代码用更自然的方式链起来:

myObservable.map(x => x + 1).map(x => x + 2);

幸运的是,我们已经有了这样一个 Observable 类,我们可以基于 prototype 在不增加复杂度的情况下支持多 Operators 的链式结构,下面我们采用prototype方式再次实现一下 Observable

Observable.prototype.map = function (project) {
    return new Observable((observer) => {
        const mapObserver = {
            next: (x) => observer.next(project(x)),
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return this.subscribe(mapObserver);
    });
};

现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 Observable 类,然后在子类中重写某些内容以优化程序。

接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。

Hot Observable vs Cold Observable

Hot Observable

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。

const socket = new WebSocket('ws://someurl');
const source = new Observable((observer) => {
  socket.addEventListener('message', (e) => observer.next(e));
});

Cold Observable

Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。并且 Cold Observable 和 Subscriber 只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个 Subscriber 的时候,他们各自的事件是独立的。

const source = new Observable((observer) => {
  const socket = new WebSocket('ws://someurl');
  socket.addEventListener('message', (e) => observer.next(e));
  return () => socket.close();
});

一个 Observable 是 Hot 还是 Cold,都是相对于生产者而言的,如果每次订阅的时候,外部的生产者已经创建好了,那就是 Hot Observable,反之,如果每次订阅的时候都会产生一个新的生产者,那就是 Cold Observable。

Pull vs Push

Pull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。

什么是Pull?

在 “拉” 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。

每一个 JavaScript 函数都是一个 “拉” 体系,函数是数据的生产者,调用函数的代码通过 ‘’拉出” 一个单一的返回值来消费该数据。

const add = (a, b) => a + b;
let sum = add(3, 4);

ES6介绍了 iterator迭代器Generator生成器 — 另一种 “拉” 体系,调用 iterator.next() 的代码是消费者,可从中拉取多个值

什么是Push?

在 “推” 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。

Promise(承诺) 是当今 JS 中最常见的 “推” 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。

RxJS 引入了 Observables (可观察对象),一个全新的 “推” 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 “推送给” Observer (观察者)。

生产者

消费者

pull拉

被请求的时候产生数据

决定何时请求数据

push推

按自己的节奏生产数据

对接收的数据进行处理

接下来我们来看张图,从而加深对上面概念的理解:

Observable vs Promise

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ

单值

多值

拉取(Pull)

函数

遍历器

推送(Push)

Promise

Observable

  • Promise
    • 返回单个值
    • 不可取消的
  • Observable
    • 随着时间的推移发出多个值
    • 可以取消的
    • 支持 map、filter、reduce 等操作符
    • 延迟执行,当订阅的时候才会开始执行

延迟计算 & 渐进式取值

延迟计算

所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。

import { from } from "rxjs";
import { map } from "rxjs/operators";

const source$ = from([1, 2, 3, 4, 5]);
const example$ = source$.pipe(map(x => x + 1));

上面的示例中,因为 example$ 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下:

const source = [1,2,3,4,5];
const example = source.map(x => x + 1);

以上代码运行后,example 中就包含已运算后的值。

渐进式取值

数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下:

const source = [1,2,3,4,5];
const example = source
				.filter(x => x % 2 === 0) // [2, 4]
              	.map(x => x + 1) // [3, 5]

关于数组中的 mapfilter 的详细信息,可以阅读 - RxJS Functional Programming

为了更好地理解数组操作符的运算过程,我们可以查看 Array Compute

虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下:

import { from } from "rxjs";
import { filter, map } from "rxjs/operators";

const source$ = from([1, 2, 3, 4, 5]);
const example$ = source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x + 1)
);

example$.subscribe(console.log);

以上代码的输出结果:

3
5

参考资源

  • 观察者模式
  • MDN - 迭代器和生成器
  • 构建流式应用—RxJS详解
  • 让我们一起来学习RxJS
  • Learning Observable By Building Observable
  • 30天精通RxJS - 什么是Observable
  • hot-vs-cold-observables

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • TypeScript 函数中的 this 参数

    从 TypeScript 2.0 开始,在函数和方法中我们可以声明 this 的类型,实际使用起来也很简单,比如:

    阿宝哥
  • TypeScript 设计模式之观察者模式

    观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

    阿宝哥
  • 你不知道的 WeakMap

    相信很多读者对 ES6 引入的 Map 已经不陌生了,其中的一部分读者可能也听说过 WeakMap。既生 Map 何生 WeakMap?带着这个问题,本文将围绕...

    阿宝哥
  • JavaScript中的this/call/apply/bind

    this 关键字在大部分语言中都是一个重要的存在,JS中自然不例外,其表达的意义丰富多样甚至有些复杂,深刻理解this是学习JS、面向对象编程非常重要的一环。

    刘亦枫
  • 中国台湾精益老专家:DevOps 变革的剖析与实践

    今天的主题,我们项目开始之初第一件事情是做什么?你接到一个项目要开始做了,最开始你要做什么?这个非常重要,你一定要知道,第一件事就是要看见全貌。通常我们以为我们...

    DevOps时代
  • 如何实现标准的dispose

    前面的文章我们说过,如果对象包含非托管资源那么就必须要正确的清理,现在我们就来说一下如何清理。针对非托管资源 .NET 会采用一套标准的模式来完成清理工作。也就...

    喵叔
  • Java-“this”和“类名.this”以及“类名.class”的区分和详解

    对于以上三个语法结构的区分,需要先理解Class类 所有对象的类以及调用了静态方法的类都需要在对象创建之前在JVM虚拟机中加载,加载内容被称为“类对象”,每个...

    Fisherman渔夫
  • React生命周期简单分析

    1.React16.3的发布带来了一些新的特性, 除了新的ContextAPI之外, 还对生命周期做了部分修改, 为了支持未来的异步渲染特性, 一下生命周期将被...

    IMWeb前端团队
  • 给学c加加小白的50条忠告

    1.请热爱C++! 2.不要被 VC、BCB、BC、MC、TC等词汇所迷惑——他们都是集成开发环境,而我们要学的是一门语言; 3.会用Visual C++,并不...

    企鹅号小编
  • SpringBoot全局异常处理

    全局异常处理是个比较重要的功能,一般在项目里都会用到。 我大概把一次请求分成三个阶段,来分别进行全局的异常处理。 一:在进入Controller之前,譬...

    天涯泪小武

扫码关注云+社区

领取腾讯云代金券