前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出 RxJS 之 Hello RxJS

深入浅出 RxJS 之 Hello RxJS

作者头像
Cellinlab
发布2023-05-17 20:15:24
2.2K0
发布2023-05-17 20:15:24
举报
文章被收录于专栏:Cellinlab's BlogCellinlab's Blog

# Observable 和 Observer

顾名思义,Observable 就是“可以被观察的对象”即“可被观察者”,而 Observer 就是“观察者”,连接两者的桥梁就是 Observable 对象的函数 subscribe

RxJS 中的数据流就是 Observable 对象,Observable 实现了下面两种设计模式:

  • 观察者模式(Observer Pattern)
  • 迭代器模式(Iterator Pattern)

# 观察者模式

观察者模式要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念,当然,“分”很容易,关键是如何“治”。

观察者模式对“治”这个问题提的解决方法是这样,将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产生事件,它会通知所有注册挂上号的观察者,而不关心这些观察者如何处理这些事件,相对的,观察者可以被注册上某个发布者,只管接收到事件之后就处理,而不关心这些数据是如何产生的。

在 RxJS 的世界中,Observable 对象就是一个发布者,通过 Observable 对象的 subscribe 函数,可以把这个发布者和某个观察者(Observer)连接起来。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/of';

// source$就是一个 Observable 对象,作为发布者,它产生的“事件”就是连续的三个整数
const source$ = Observable.of(1, 2, 3);
// 扮演观察者的是 console.log 函数,不管传入什么“事件”,它只管把“事件”输出到 console 上
source$.subscribe(console.log);

观察者模式带来的好处很明显,这个模式中的两方都可以专心做一件事,而且可以任意组合,也就是说,复杂的问题被分解成三个小问题:

  • 如何产生事件,这是发布者的责任,在 RxJS 中是 Observable 对象的工作
  • 如何响应事件,这是观察者的责任,在 RxJS 中由 subscribe 的参数来决定
  • 什么样的发布者关联什么样的观察者,也就是何时调用 subscribe

# 迭代器模式

迭代者(Iterator,也称为“迭代器”)指的是能够遍历一个数据集合的对象,因为数据集合的实现方式很多,可以是一个数组,也可以是一个树形结构,也可以是一个单向链表……迭代器的作用就是提供一个通用的接口,让使用者完全不用关心这个数据集合的具体实现方式。

设计模式的实现方式很多,但是不管对应的函数如何命名,通常都应该包含这样几个函数:

  • getCurrent,获取当前被游标所指向的元素
  • moveToNext,将游标移动到下一个元素,调用这个函数之后,getCurrent 获得的元素就会不同
  • isDone,判断是否已经遍历完所有的元素
代码语言:javascript
复制
const iterator = getIterator();
while (!iterator.isDone()) {
  const current = iterator.getCurrent();
  // do something with current
  iterator.moveToNext();
}

在 RxJS 中,作为迭代器的使用者,并不需要主动去从 Observable 中“拉”数据,而是只要 subscribeObservable 对象之后,自然就能够收到消息的推送,这就是观察者模式和迭代器两种模式结合的强大之处。

# 创造 Observable

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';

// 作为参数传给 Observable 构造函数
// 接受一个 observer 参数,在函数体内调用参数 observer 的 next 函数将数据“推”给 Observer
const onSubscribe = observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
};

// 调用 Observable 构造函数,产生数据流 source$
// onSubscribe 会等待 source$ 的 subscribe 被调用
const source$ = new Observable(onSubscribe);

// 观察者 theObserver
const theObserver = {
  next: item => console.log(item),
};

// 通过 subscribe 函数将 theObserver 和 source$ 关联起来
// subscribe 调用过程中,会对 theObserver 进行包装(或者代理)作为 onSubscribe 的参数,并且会调用 onSubscribe 函数
source$.subscribe(theObserver);
// 1
// 2
// 3

创建 Observable 对象也就是创建一个“发布者”,一个“观察者”调用某个 Observable 对象的 subscribe 函数,对应的 onSubscribe 函数就会被调用,参数就是“观察者”对象,onSubscribe 函数中可以任意操作“观察者”对象。这个过程,就等于在这个 Observable 对象上挂了号,以后当这个 Observable 对象产生数据时,观察者就会获得通知。

在 RxJS 中,Observable 是一个特殊类,它接受一个处理 Observer 的函数,而 Observer 就是一个普通的对象,没有什么神奇之处,对 Observer 对象的要求只有它必须包含一个名为 next 的属性,这个属性的值是一个函数,用于接收被“推”过来的数据。

# 跨越时间的 Observable

Observer 是被“推”数据的,在执行过程中处于被动地位,所以,控制节奏的事情,还是应该交给 Observable 来做,Observable 既然能够“推”数据,那同时负责推送数据的节奏,完全合理。

代码语言:javascript
复制
const onSubscribe = observer => {
  let number = 1;
  const handle = setInterval(() => {
    observer.next(number);
    number++;
    if (number > 3) {
      clearInterval(handle);
    }
  }, 1000);
};

Observable 推送数据可以有时间间隔的特性使得异步操作十分容易,因为对于观察者 Observer,只需要被动接受推送数据来处理,而不用关心数据何时产生。

# 永无止境的 Observable

Observable 对象中吐出来的数据可以是无穷的。

代码语言:javascript
复制
const onSubscribe = observer => {
  let number = 1;
  const handle = setInterval(() => {
    observer.next(number);
    number++;
  }, 1000);
};

假如不中断程序,一直运行,程序并不会消耗更多内存,因为 Observable 每次吐出的数据被 Observer 消费了,不会存在堆积。如果把数据堆积到一个数组中,然后挨个处理数组中的元素,内存消耗会随数组大小改变。

# Observable 的完结

调用 Observernext 只能表达“这是现在要推送的数据”,next 没法表达“已经没有更多数据了”,所以,为了让 Observable 有机会告诉 Observer “已经没有更多数据了”,需要有另外一种通信机制,在 RxJS 中,实现这种通信机制用的就是 Observercomplete 函数。

代码语言:javascript
复制
const theObserver = {
  next: value => console.log(value),
  complete: () => console.log('complete')
};

对应的 Observable 对象需要显示调用 complete 函数

代码语言:javascript
复制
const onSubscribe = observer => {
  let number = 1;
  const handle = setInterval(() => {
    observer.next(number);
    number++;
    if (number > 3) {
      clearInterval(handle);
      observer.complete();
    }
  }, 1000);
};

# Observable 的出错处理

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';

const onSubscribe = observer => {
  observer.next(1);
  observer.error('error');
  observer.complete();
};

const source$ = new Observable(onSubscribe);

const theObserver = {
  next: value => console.log(value),
  error: error => console.log(error),
  complete: () => console.log('complete')
};

source$.subscribe(theObserver);
// 1
// error

在 RxJS 中,一个 Observable 对象只有一种终结状态,要么是完结(complete),要么是出错(error),一旦进入出错状态,这个 Observable 对象也就终结了,再不会调用对应 Observernext 函数,也不会再调用 Observercomplete 函数;同样,如果一个 Observable 对象进入了完结状态,也不能再调用 Observernexterror

# Observer 的简单形式

Observer 对象都是一个对象,可以包含 nextcompleteerror 三个方法,用于接受 Observable 的三种不同事件,如果根本不关心某种事件的话,也可以不实现对应的方法。

为了让代码更加简洁,没有必要创造一个 Observer 对象,subscribe 也可以直接接受函数作为参数,第一个参数如果是函数类型,就被认为是 next,第二个函数参数被认为是 error,第三个函数参数被认为是 complete

代码语言:javascript
复制
source$.subscribe(
  value => console.log(value),
  error => console.log(error),
  () => console.log('complete')
);

如果不关心异常处理,但关心结束:

代码语言:javascript
复制
source$.subscribe(
  value => console.log(value),
  null,
  () => console.log('complete')
);

# 取消订阅 Obsservable

onSubscribe 函数可以返回一个对象,对象上可以有一个 unsubscribe 函数

代码语言:javascript
复制
const onSubscribe = observer => {
  let number = 1;
  const handler = setInterval(() => {
    observer.next(number++);
  }, 1000);
  return {
    unsubscribe: () => clearInterval(handler)
  };
};

const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(
  items => console.log(items)
);

setTimeout(() => {
  subscription.unsubscribe();
}, 3500);

虽然 unsubscribe 函数调用之后,作为 Observer 不再接受到被推送的数据,但是作为 Observablesource$ 并没有终结,因为始终没有调用 complete ,只不过它再也不会调用 next 函数了。

代码语言:javascript
复制
const onSubscribe = observer => {
  let number = 1;
  const handle = setInterval(() => {
    console.log('in onSubscribe', number);
    observer.next(number++);
  }, 1000);
  return {
    unsubscribe: () => {
      // clearInterval(handle);
    },
  };
};

// inSubscribe 1
// 1
// inSubscribe 2
// 2
// inSubscribe 3
// 3
// inSubscribe 4
// inSubscribe 5
// inSubscribe 6

Observable 对象 source 在 unsubscribe 函数调用之后依然在不断调用 next 函数,但是,unsubscribe 已经断开了 source 对象和 Observer 的连接,所以,之后无论 onSubscribe 中如何调用 next,Observer 都不会作出任何响应。

这是 RxJS 中很重要的一点:Observable 产生的事件,只有 Observer 通过 subscribe 订阅之后才会收到,在 unsubscribe 之后就不会再收到。

# Hot Observable 和 Cold Observable

假设有这样的场景,一个 Observable 对象有两个 Observer 对象来订阅,而且这两个 Observer 对象并不是同时订阅,第一个 Observer 对象订阅N秒钟之后,第二个 Observer 对象才订阅同一个 Observable 对象,而且,在这 N 秒钟之内,Observable 对象已经吐出了一些数据。现在问题来了,后订阅上的 Observer ,是不是应该接收到“错过”的那些数据呢?

  • 选择 A:错过就错过了,只需要接受从订阅那一刻开始 Observable 产生的数据就行
  • 选择 B:不能错过,需要获取 Observable 之前产生的数据

RxJS 考虑到了这两种不同场景的特点,让 Observable 支持这两种不同的需求,对应于选择 A,称这样的 Observable 为 Hot Observable,对于选择 B,称之为 Cold Observable。

如果设想有一个数据“生产者”(producer)的角色,那么,对于 Cold Observable,每一次订阅,都会产生一个新的“生产者”。

每一个 Cold Observable 概念上都可以理解成对每一次 subscribe 都产生一个“生产者”,然后这个生产者产生的数据通过 next 函数传递给订阅的 Observer

代码语言:javascript
复制
const cold$ = new Observable(observer => {
  const producer = new Producer();
  // observer 去接受 producer 生产的数据
});

对于一个 Hot Observable,概念上是有一个独立于 Observable 对象的“生产者”,这个“生产者”的创建和 subscribe 调用没有关系,subscribe 调用只是让 Observer 连接上“生产者”而已:

代码语言:javascript
复制
const producer = new Producer();
const hot$ = new Observable(observer => {
  // observer 去接受 producer 生产的数据
});

Observable 称为“发布者”(publisher)而不是“生产者”,有意回避了“生产者”这个词,就是因为在 Hot Observable 中,Observable 明显并不产生数据,只是数据的搬运工。

一个 Observable 是 Hot 还是 Cold,是“热”还是“冷”,都是相对于生产者而言的,如果每次订阅的时候,已经有一个热的“生产者”准备好了,那就是 Hot Observable,相反,如果每次订阅都要产生一个新的生产者,新的生产者就像汽车引擎一样刚启动时肯定是冷的,所以叫 Cold Observable。

# 操作符

对于现实中复杂的问题,并不会创造一个数据流之后就直接通过 subscribe 接上一个 Observer,往往需要对这个数据流做一系列处理,然后才交给 Observer。就像一个管道,数据从管道的一段流入,途径管道各个环节,当数据到达 Observer 的时候,已经被管道操作过,有的数据已经被中途过滤抛弃掉了,有的数据已经被改变了原来的形态,而且最后的数据可能来自多个数据源,最后 Observer 只需要处理能够走到终点的数据。

在 RxJS 中,组成数据管道的元素就是操作符,对于每一个操作符,链接的就是上游(upstream)和下游(downstream)。

对一个操作符来说,上游可能是一个数据源,也可能是其他操作符,下游可能是最终的观察者,也可能是另一个操作符,每一个操作符之间都是独立的,正因为如此,所以可以对操作符进行任意组合,从而产生各种功能的数据管道。

在 RxJS 中,有一系列用于产生 Observable 函数,这些函数有的凭空创造 Observable 对象,有的根据外部数据源产生 Observable 对象,更多的是根据其他的 Observable 中的数据来产生新的 Observable 对象,也就是把上游数据转化为下游数据,所有这些函数统称为操作符。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/operators/map';

const onSubscribe = observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
};

const source$ = Observable.create(onSubscribe);

// 对于 Observable 的 map,是对其中每一个数据映射为一个新的值,产生一个新的 Observable 对象
source$.map(x => x * x).subscribe(console.log);

// 1
// 4
// 9

总之,操作符就是用来产生全新Observable对象的函数。

# 弹珠图

根据弹珠图的传统,竖杠符号|代表的是数据流的完结,对应调用下游的 complete 函数。符号 × 代表数据流中的异常,对应于调用下游的 error 函数。

为了描述操作符的功能,弹珠图中往往会出现多条时间轴,因为大部分操作符的工作都是把上游的数据转为传给下游的数据,在弹珠图上必须把上下游的数据流都展示出来。

可以在 RxJS Marbles (opens new window)RxViz (opens new window) 查看和编写弹珠图。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022/7/17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # Observable 和 Observer
    • # 观察者模式
      • # 迭代器模式
        • # 创造 Observable
          • # 跨越时间的 Observable
            • # 永无止境的 Observable
              • # Observable 的完结
                • # Observable 的出错处理
                  • # Observer 的简单形式
                  • # 取消订阅 Obsservable
                  • # Hot Observable 和 Cold Observable
                  • # 操作符
                  • # 弹珠图
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档