顾名思义,Observable
就是“可以被观察的对象”即“可被观察者”,而 Observer
就是“观察者”,连接两者的桥梁就是 Observable
对象的函数 subscribe
。
RxJS 中的数据流就是 Observable
对象,Observable
实现了下面两种设计模式:
观察者模式要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念,当然,“分”很容易,关键是如何“治”。
观察者模式对“治”这个问题提的解决方法是这样,将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产生事件,它会通知所有注册挂上号的观察者,而不关心这些观察者如何处理这些事件,相对的,观察者可以被注册上某个发布者,只管接收到事件之后就处理,而不关心这些数据是如何产生的。
在 RxJS 的世界中,Observable
对象就是一个发布者,通过 Observable
对象的 subscribe
函数,可以把这个发布者和某个观察者(Observer
)连接起来。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/of';
// source$就是一个 Observable 对象,作为发布者,它产生的“事件”就是连续的三个整数
const source$ = Observable.of(1, 2, 3);
// 扮演观察者的是 console.log 函数,不管传入什么“事件”,它只管把“事件”输出到 console 上
source$.subscribe(console.log);
观察者模式带来的好处很明显,这个模式中的两方都可以专心做一件事,而且可以任意组合,也就是说,复杂的问题被分解成三个小问题:
Observable
对象的工作subscribe
的参数来决定subscribe
迭代者(Iterator,也称为“迭代器”)指的是能够遍历一个数据集合的对象,因为数据集合的实现方式很多,可以是一个数组,也可以是一个树形结构,也可以是一个单向链表……迭代器的作用就是提供一个通用的接口,让使用者完全不用关心这个数据集合的具体实现方式。
设计模式的实现方式很多,但是不管对应的函数如何命名,通常都应该包含这样几个函数:
getCurrent
,获取当前被游标所指向的元素moveToNext
,将游标移动到下一个元素,调用这个函数之后,getCurrent
获得的元素就会不同isDone
,判断是否已经遍历完所有的元素const iterator = getIterator();
while (!iterator.isDone()) {
const current = iterator.getCurrent();
// do something with current
iterator.moveToNext();
}
在 RxJS 中,作为迭代器的使用者,并不需要主动去从
Observable
中“拉”数据,而是只要subscribe
上Observable
对象之后,自然就能够收到消息的推送,这就是观察者模式和迭代器两种模式结合的强大之处。
import { Observable } from 'rxjs/Observable';
// 作为参数传给 Observable 构造函数
// 接受一个 observer 参数,在函数体内调用参数 observer 的 next 函数将数据“推”给 Observer
const onSubscribe = observer => {
observer.next(1);
observer.next(2);
observer.next(3);
};
// 调用 Observable 构造函数,产生数据流 source$
// onSubscribe 会等待 source$ 的 subscribe 被调用
const source$ = new Observable(onSubscribe);
// 观察者 theObserver
const theObserver = {
next: item => console.log(item),
};
// 通过 subscribe 函数将 theObserver 和 source$ 关联起来
// subscribe 调用过程中,会对 theObserver 进行包装(或者代理)作为 onSubscribe 的参数,并且会调用 onSubscribe 函数
source$.subscribe(theObserver);
// 1
// 2
// 3
创建 Observable
对象也就是创建一个“发布者”,一个“观察者”调用某个 Observable
对象的 subscribe
函数,对应的 onSubscribe
函数就会被调用,参数就是“观察者”对象,onSubscribe
函数中可以任意操作“观察者”对象。这个过程,就等于在这个 Observable
对象上挂了号,以后当这个 Observable
对象产生数据时,观察者就会获得通知。
在 RxJS 中,Observable
是一个特殊类,它接受一个处理 Observer
的函数,而 Observer
就是一个普通的对象,没有什么神奇之处,对 Observer
对象的要求只有它必须包含一个名为 next
的属性,这个属性的值是一个函数,用于接收被“推”过来的数据。
Observer
是被“推”数据的,在执行过程中处于被动地位,所以,控制节奏的事情,还是应该交给 Observable
来做,Observable
既然能够“推”数据,那同时负责推送数据的节奏,完全合理。
const onSubscribe = observer => {
let number = 1;
const handle = setInterval(() => {
observer.next(number);
number++;
if (number > 3) {
clearInterval(handle);
}
}, 1000);
};
Observable
推送数据可以有时间间隔的特性使得异步操作十分容易,因为对于观察者 Observer
,只需要被动接受推送数据来处理,而不用关心数据何时产生。
Observable
对象中吐出来的数据可以是无穷的。
const onSubscribe = observer => {
let number = 1;
const handle = setInterval(() => {
observer.next(number);
number++;
}, 1000);
};
假如不中断程序,一直运行,程序并不会消耗更多内存,因为
Observable
每次吐出的数据被Observer
消费了,不会存在堆积。如果把数据堆积到一个数组中,然后挨个处理数组中的元素,内存消耗会随数组大小改变。
调用 Observer
的 next
只能表达“这是现在要推送的数据”,next
没法表达“已经没有更多数据了”,所以,为了让 Observable
有机会告诉 Observer
“已经没有更多数据了”,需要有另外一种通信机制,在 RxJS 中,实现这种通信机制用的就是 Observer
的 complete
函数。
const theObserver = {
next: value => console.log(value),
complete: () => console.log('complete')
};
对应的 Observable
对象需要显示调用 complete
函数
const onSubscribe = observer => {
let number = 1;
const handle = setInterval(() => {
observer.next(number);
number++;
if (number > 3) {
clearInterval(handle);
observer.complete();
}
}, 1000);
};
import { Observable } from 'rxjs/Observable';
const onSubscribe = observer => {
observer.next(1);
observer.error('error');
observer.complete();
};
const source$ = new Observable(onSubscribe);
const theObserver = {
next: value => console.log(value),
error: error => console.log(error),
complete: () => console.log('complete')
};
source$.subscribe(theObserver);
// 1
// error
在 RxJS 中,一个 Observable
对象只有一种终结状态,要么是完结(complete),要么是出错(error),一旦进入出错状态,这个 Observable
对象也就终结了,再不会调用对应 Observer
的 next
函数,也不会再调用 Observer
的 complete
函数;同样,如果一个 Observable
对象进入了完结状态,也不能再调用 Observer
的 next
和 error
。
Observer
对象都是一个对象,可以包含 next
、complete
和 error
三个方法,用于接受 Observable
的三种不同事件,如果根本不关心某种事件的话,也可以不实现对应的方法。
为了让代码更加简洁,没有必要创造一个 Observer
对象,subscribe
也可以直接接受函数作为参数,第一个参数如果是函数类型,就被认为是 next
,第二个函数参数被认为是 error
,第三个函数参数被认为是 complete
:
source$.subscribe(
value => console.log(value),
error => console.log(error),
() => console.log('complete')
);
如果不关心异常处理,但关心结束:
source$.subscribe(
value => console.log(value),
null,
() => console.log('complete')
);
onSubscribe
函数可以返回一个对象,对象上可以有一个 unsubscribe
函数
const onSubscribe = observer => {
let number = 1;
const handler = setInterval(() => {
observer.next(number++);
}, 1000);
return {
unsubscribe: () => clearInterval(handler)
};
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(
items => console.log(items)
);
setTimeout(() => {
subscription.unsubscribe();
}, 3500);
虽然 unsubscribe
函数调用之后,作为 Observer
不再接受到被推送的数据,但是作为 Observable
的 source$
并没有终结,因为始终没有调用 complete
,只不过它再也不会调用 next
函数了。
const onSubscribe = observer => {
let number = 1;
const handle = setInterval(() => {
console.log('in onSubscribe', number);
observer.next(number++);
}, 1000);
return {
unsubscribe: () => {
// clearInterval(handle);
},
};
};
// inSubscribe 1
// 1
// inSubscribe 2
// 2
// inSubscribe 3
// 3
// inSubscribe 4
// inSubscribe 5
// inSubscribe 6
Observable 对象 source 在 unsubscribe 函数调用之后依然在不断调用 next 函数,但是,unsubscribe 已经断开了 source 对象和 Observer 的连接,所以,之后无论 onSubscribe 中如何调用 next,Observer 都不会作出任何响应。
这是 RxJS 中很重要的一点:Observable
产生的事件,只有 Observer
通过 subscribe
订阅之后才会收到,在 unsubscribe
之后就不会再收到。
假设有这样的场景,一个 Observable
对象有两个 Observer
对象来订阅,而且这两个 Observer
对象并不是同时订阅,第一个 Observer
对象订阅N秒钟之后,第二个 Observer
对象才订阅同一个 Observable
对象,而且,在这 N 秒钟之内,Observable
对象已经吐出了一些数据。现在问题来了,后订阅上的 Observer
,是不是应该接收到“错过”的那些数据呢?
Observable
产生的数据就行Observable
之前产生的数据RxJS 考虑到了这两种不同场景的特点,让 Observable
支持这两种不同的需求,对应于选择 A,称这样的 Observable
为 Hot Observable,对于选择 B,称之为 Cold Observable。
如果设想有一个数据“生产者”(producer)的角色,那么,对于 Cold Observable,每一次订阅,都会产生一个新的“生产者”。
每一个 Cold Observable 概念上都可以理解成对每一次 subscribe
都产生一个“生产者”,然后这个生产者产生的数据通过 next
函数传递给订阅的 Observer
:
const cold$ = new Observable(observer => {
const producer = new Producer();
// observer 去接受 producer 生产的数据
});
对于一个 Hot Observable,概念上是有一个独立于 Observable
对象的“生产者”,这个“生产者”的创建和 subscribe
调用没有关系,subscribe
调用只是让 Observer
连接上“生产者”而已:
const producer = new Producer();
const hot$ = new Observable(observer => {
// observer 去接受 producer 生产的数据
});
把 Observable
称为“发布者”(publisher)而不是“生产者”,有意回避了“生产者”这个词,就是因为在 Hot Observable 中,Observable
明显并不产生数据,只是数据的搬运工。
一个 Observable
是 Hot 还是 Cold,是“热”还是“冷”,都是相对于生产者而言的,如果每次订阅的时候,已经有一个热的“生产者”准备好了,那就是 Hot Observable,相反,如果每次订阅都要产生一个新的生产者,新的生产者就像汽车引擎一样刚启动时肯定是冷的,所以叫 Cold Observable。
对于现实中复杂的问题,并不会创造一个数据流之后就直接通过 subscribe
接上一个 Observer
,往往需要对这个数据流做一系列处理,然后才交给 Observer
。就像一个管道,数据从管道的一段流入,途径管道各个环节,当数据到达 Observer
的时候,已经被管道操作过,有的数据已经被中途过滤抛弃掉了,有的数据已经被改变了原来的形态,而且最后的数据可能来自多个数据源,最后 Observer
只需要处理能够走到终点的数据。
在 RxJS 中,组成数据管道的元素就是操作符,对于每一个操作符,链接的就是上游(upstream)和下游(downstream)。
对一个操作符来说,上游可能是一个数据源,也可能是其他操作符,下游可能是最终的观察者,也可能是另一个操作符,每一个操作符之间都是独立的,正因为如此,所以可以对操作符进行任意组合,从而产生各种功能的数据管道。
在 RxJS 中,有一系列用于产生 Observable
函数,这些函数有的凭空创造 Observable
对象,有的根据外部数据源产生 Observable
对象,更多的是根据其他的 Observable
中的数据来产生新的 Observable
对象,也就是把上游数据转化为下游数据,所有这些函数统称为操作符。
import { Observable } from 'rxjs/Observable';
import 'rxjs/operators/map';
const onSubscribe = observer => {
observer.next(1);
observer.next(2);
observer.next(3);
};
const source$ = Observable.create(onSubscribe);
// 对于 Observable 的 map,是对其中每一个数据映射为一个新的值,产生一个新的 Observable 对象
source$.map(x => x * x).subscribe(console.log);
// 1
// 4
// 9
总之,操作符就是用来产生全新Observable对象的函数。
根据弹珠图的传统,竖杠符号|
代表的是数据流的完结,对应调用下游的 complete
函数。符号 ×
代表数据流中的异常,对应于调用下游的 error
函数。
为了描述操作符的功能,弹珠图中往往会出现多条时间轴,因为大部分操作符的工作都是把上游的数据转为传给下游的数据,在弹珠图上必须把上下游的数据流都展示出来。
可以在 RxJS Marbles (opens new window) 和 RxViz (opens new window) 查看和编写弹珠图。