功能需求 | 适用操作符 |
---|---|
直接操作观察者 | create |
根据有限的数据产生同步数据流 | of |
产生一个数值范围内的数据 | range |
以循环方式产生数据 | generate |
重复产生数据流中的数据 | repeat 和 repeatWhen |
产生空数据流 | empty |
产生直接出错的数据流 | throw |
产生永不完结的数据流 | never |
间隔给定时间持续产生数据 | interval 和 timer |
从数组等枚举类型数据产生数据流 | from |
从 Promise 对象产生数据流 | fromPromise |
从外部事件对象产生数据流 | fromEvent 和 fromEventPattern |
从 Ajax 请求结果产生数据流 | ajax |
延迟产生数据流 | defer |
所谓创建类操作符,就是一些能够创造出一个 Observable
对象的方法,所谓“创造”,并不只是说返回一个 Observable
对象,因为任何一个操作符都会返回 Observable
对象,这里所说的创造,是指这些操作符不依赖于其他 Observable
对象,这些操作符可以凭空或者根据其他数据源创造出一个 Observable
对象。
创建类操作符并不是不需要任何输入,很多创建型的操作符都接受输入参数,有的还需要其他的数据源,比如浏览器的 DOM 结构或者 WebSocket 。重要的是,创建类操作符往往不会从其他 Observable
对象获取数据,在数据管道中,创建类操作符就是数据流的源头。因为创建类操作符的这个特性,创建类操作符大部分(并不是全部)都是静态操作符。
对于应用开发工程师,应该尽量使用创建类操作符,避免直接利用 Observable
的构造函数来创造 Observable
对象,RxJS 提供的创建类操作符覆盖了几乎所有的数据流创建模式,没有必要重复发明轮子。在很多场景下,开发者自己用构造函数创造 Observable
对象可能需要写很多代码,使用 RxJS 提供的创建类操作符可能只需要一行就能搞定。
同步数据流,或者说同步 Observable
对象,需要关心的就是:
对于同步数据流,数据之间的时间间隔不存在,所以不需要考虑时间方面的问题。
create
是最简单的一个操作符,因为它的功能很简单,就是直接调用 Observable
的构造函数:
Observable.create = function (subscribe) {
return new Observable(subscribe);
}
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
// 产生包含三个正整数的 Observable 对象
const source$ = Observable.of(1, 2, 3);
// source$ 被 subscribe 后会将参数依次吐出来
// 然后调用 Observer 的 complete 方法
source$.subscribe(
value => console.log(value),
err => console.error(err),
() => console.log('complete')
);
source$
被订阅时,吐出数据的过程是同步的,也就是没有任何时间上的间隔。
const source$ = Observable.of(1);
of
产生的是 Cold Observable,对于每一个 Observer
都会重复吐出同样的一组数据,所以可以反复使用。
适合使用 of
的场合是已知不多的几个数据,想要把这些数据用 Observable
对象来封装,然后就可以利用 RxJS 强大的数据管道功能来处理,而且,也不需要这些数据的处理要有时间间隔,这就用得上 of
了。
range
的含义就是“范围”,只需要指定一个范围的开始值和长度,range
就能够产生这个范围内的数字序列。
// 产生一个从 1 到 100 的所有正整数构成的数据流
const source$ = Observable.range(1, 100);
和 of
一样,range
以同步的方式吐出数据,也就是 100 个数据依次无时间间隔一口气全推给 Observer
,然后调用 Observer
的 complete
函数。
RxJS 提供的每个操作符都包含尽量简洁的功能,但是通过多个操作符的组合,就可以提供复杂的功能。虽然 range
不支持递增序列的定制,但是可以通过 range
和 map
的组合来实现。
const source$ = Observable.of(1, 2, 3).map(x => x * 2);
generate
类似一个 for
循环,设定一个初始值,每次递增这个值,直到满足某个条件的时候才中止循环,同时,循环体内可以根据当前值产生数据。
const result = [];
for (let i = 2; i < 10; i+=2) {
result.push(i*i);
}
// 用 generate 实现上面的代码
const source$ = Observable.generate(
2, // 初始值
x => x < 10, // 条件
x => x + 2, // 递增
x => x * x // 结果
);
使用 generate
,四个参数分别对应了 for
循环中的不同表达式,其中,除了第一个参数是一个值之外,其余三个参数都是函数,应该保持这三个参数都是纯函数,这样才符合函数式编程的原则。
用 generate
可以有很大的自由度,可以通过 generate
产生需求复杂的数据,实际上,可以通过 generate
来实现 range
的功能:
const range = (start, count) => {
const max = start + count;
return Observable.generate(
start, // 初始值
x => x < max, // 条件
x => x + 1, // 递增
x => x // 结果
);
};
在传统的 JavaScript 编程中,如果某个问题的解决方法是用一个 for
循环产生的数据集合,那么搬到 RxJS 的世界,就适合于使用 generate
来产生一个 Observable
对象。
repeat
的功能是可以重复上游 Observable
中的数据若干次。
const source$ = Observable.of(1, 2, 3);
const repeated$ = source$.repeat(10); // 重复 source$ 中的数据 10 次
repeated 是一个全新的 Observable 对象,它并没有改变 source ,source 自始至终还是只产生 1、2、3 然后就结束的数据流,在 repeat 的作用下,source 实际上被 subscribe 了 10 次,这 10 次 source 吐出的数据全部都变成了 repeated 吐出的数据。
值得注意的是,repeat
只有在上游 Observable
对象完结之后才会重新订阅,因为在完结之前,repeat
也不知道会不会有新的数据从上游被推送下来。
因为 repeat
的“重复”功能依赖于上游的完结时机,所以,使用 repeat
很重要的一点,就是保证上游 Observable
对象最终一定会完结,不然使用 repeat
就没有意义。
此外,repeat
的参数代表重复次数,如果不传入这个参数,或者传入参数为负数,那就代表无限次的重复,除非预期得到一个无限循环的数据流,不然应该给 repeat
一个正整数参数,这样才能保证 repeat
产生的 Observable
对象有完结的时候。
empty
empty
就是产生一个直接完结的 Observable
对象,没有参数,不产生任何数据,直接完结。
import 'rxjs/add/observable/empty';
const source$ = Observable.empty();
throw
throw
产生的 Observable
对象也是什么都不做,直接出错,抛出的错误就是 throw
的参数
import 'rxjs/add/observable/throw';
const source$ = Observable.throw(new Error('Oops'));
source$
代表的 Observable
对象不会产生任何数据,一开始就会直接给下游传递一个 Error
对象。
never
never
产生的 Observable
对象什么都不做,既不吐出数据,也不完结,也不产生错误,就这样待着,一直到永远。
import 'rxjs/add/observable/never';
const source$ = Observable.never();
异步数据流,或者说异步 Observable
对象,不光要考虑产生什么数据,还要考虑这些数据之间的时间间隔问题, RxJS 提供的操作符就是要让开发者在日常尽量不要考虑时间因素。
interval
接受一个数值类型的参数,代表产生数据的间隔毫秒数,返回的 Observable
对象就按照这个时间间隔输出递增的整数序列,从 0 开始。比如,interval
的参数是 1000,那么,产生的 Observable
对象在被订阅之后,在1秒钟的时刻吐出数据 0,在 2 秒钟的时刻吐出数据 1,在 3 秒钟的时刻吐出数据 2……
import 'rxjs/add/observable/interval';
const source$ = Observable.interval(1000);
注意到这个弹珠图中没有完结符号,表示这个数据流不会完结,因为 interval
不会主动调用下游的 complete
,要想停止这个数据序列,就必须要做退订的动作。
在 RxJS 中,每个操作符都尽量功能精简,所以 interval
并没有参数用来定制数据序列的起始值,要解决复杂问题,应该用多个操作符的组合,而不是让一个操作符的功能无限膨胀。
// 从 1 递增的数据序列
const source$ = Observable.interval(1000);
const result$ = source$.map(x => x + 1);
interval
就是 RxJS 世界中的 setInterval
,区别只是 setInterval
定时调用一个函数,而 interval
返回的 Observable
对象定时产生一个数据。
timer
的第一个参数可以是一个数值,也可以是一个 Date
类型的对象。如果第一个参数是数值,代表毫秒数,产生的 Observable
对象在指定毫秒之后会吐出一个数据 0
,然后立刻完结。
import 'rxjs/add/observable/timer';
const source$ = Observable.timer(1000);
timer
还支持第二个参数,如果使用第二个参数,那就会产生一个持续吐出数据的 Observable
对象,类似 interval
的数据流。第二个参数指定的是各数据之间的时间间隔,从被订阅到产生第一个数据 0 的时间间隔,依然由第一个参数决定。
// 2 s 吐出 0,3 s 吐出 1,4 s 吐出 2 ...
const source$ = Observable.timer(2000, 1000);
如果 timer
的第一个参数和第二个参数一样,那就和 interval
的功能完全一样了。
from
可能是创建类操作符中包容性最强的一个了,因为它接受的参数只要“像” Observable
就行,然后根据参数中的数据产生一个真正的 Observable
对象。
“像” Observable
的对象很多,一个数组就像 Observable
,一个不是数组但是“像”数组的对象也算,一个字符串也很像 Observable
,一个 JavaScript 中的 generator
也很像 Observable
,一个 Promise
对象也很像,所以, from
可以把任何对象都转化为 Observable
对象。
import 'rxjs/add/observable/from';
const source$ = Observable.from([1, 2, 3]);
如果 from 的参数是 Promise 对象,那么这个 Promise 成功结束, from 产生的 Observable 对象就会吐出 Promise 成功的结果,并且立刻结束:
const promise = Promise.resolve('good');
const source$ = Observable.fromPromise(promise);
source$.subscribe(
console.log,
err => console.error(err),
() => console.log('complete')
);
// good
// complete
Promise
对象虽然也支持异步操作,但是它只有一个结果,所以当 Promise
成功完成的时候, from
也知道不会再有新的数据了,所以立刻完结了产生的 Observable
对象。
fromEvent
的第一个参数是一个事件源,在浏览器中,最常见的事件源就是特定的 DOM 元素,第二个参数是事件的名称,对应 DOM 事件就是 click
、 mousemove
这样的字符串。
<div>
<button id="ClickMe">Click me</button>
<div id="result"></div>
</div>
<script>
let clickCount = 0;
const event$ = Rx.Observable.fromEvent(
document.querySelector('#ClickMe'),
'click'
);
event$.subscribe(() => {
clickCount++;
document.querySelector('#result').innerText = clickCount;
});
</script>
fromEvent
除了可以从 DOM 中获得数据,还可以从 Node.js 的 events
中获得数据:
import { Observable } from 'rxjs/Observable';
import EventEmitter from 'events';
import 'rxjs/add/observable/fromEvent';
const emitter = new EventEmitter();
const source$ = Observable.fromEvent(emitter, 'msg');
source$.subscribe(
msg => console.log(msg),
err => console.error(err),
() => console.log('complete')
);
emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');
emitter.emit('msg-1', 'hello');
emitter.emit('msg', '!');
// hello
// world
// !
fromEvent
产生的是 Hot Observable,也就是数据的产生和订阅是无关的,如果在订阅之前调用 emitter.emit
,那有没有 Observer
这些数据都会立刻吐出来,等不到订阅的时候,当添加了 Observer
的时候,自然什么数据都获得不到。
fromEventPattern
接受两个函数参数,分别对应产生的 Observable
对象被订阅和退订时的动作,因为这两个参数是函数,具体的动作可以任意定义,所以可以非常灵活。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';
import EventEmitter from 'events';
const emitter = new EventEmitter();
const addHandler = (handler) => {
emitter.addListener('msg', handler);
};
const removeHandler = (handler) => {
emitter.removeListener('msg', handler);
};
const source$ = Observable.fromEventPattern(addHandler, removeHandler);
const subscription = source$.subscribe(
msg => console.log(msg),
err => console.error(err),
() => console.log('complete')
);
emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');
subscription.unsubscribe();
emitter.emit('msg', '!');
// hello
// world
fromEventPattern
提供的就是一种模式,不管数据源是怎样的行为,最后的产出都是一个 Observable
对象,对一个 Observable
对象交互的两个重要操作就是 subscribe
和 unsubscribe
,所以, fromEventPattern
设计为这样,当 Observable
对象被 subscribe
时第一个函数参数被调用,被 unsubscribe
时第二个函数参数被调用。
<div>
<button id="getStar">Get RxJS Star Count</button>
<div id="result"></div>
</div>
<script>
Rx.Observable.fromEvent(
document.querySelector('#getStar'),
'click'
).subscribe(
() => {
Rx.Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', {
responseType: 'json'
}).subscribe(value => {
const startCount = value.response.stargazers_count;
document.querySelector('#result').innerText = startCount;
});
}
);
</script>
repeat
能够反复订阅上游的 Observable
,但是并不能控制订阅的时间,比如希望在接收到上游完结事件的时候等待一段时间再重新订阅,这样的功能 repeat
无法做,但是 repeatWhen
可以满足上面描述的需求。
repeatWhen
接受一个函数作为参数,这个函数在上游第一次产生异常时被调用,然后这个函数应该返回一个 Observable
对象,这个对象就是一个控制器,作用就是控制 repeatWhen
何时重新订阅上游,当控制器 Observable
吐出一个数据的时候, repeatWhen
就会做退订上游并重新订阅的动作。
用一个 Observable
对象来控制另一个 Observable
对象中数据的产生,这是 RxJS 中的一个常见模式。
const notifier = () => {
return Observable.interval(1000);
};
const source$ = Observable.of(1 , 2, 3);
const repeated$ = source$.repeatWhen(notifier);
// 每隔 1 秒钟,repeat$ 会重复订阅上游 source$,吐出的数据虽然是1、2、3序列的循环
// 但是每次循环之间间隔1秒钟。
如果 repeatWhen
的上游并不是同步产生数据,完结的时机也完全不能确定,如果想要每次在上游完结之后重新订阅,那使用 interval
来控制重新订阅的节奏就无法做到准确了,这时候就需要用到 notifier
函数的参数:
const notifier = (notification$) => {
return notification$.delay(2000);
};
const repeated$ = source$.repeatWhen(notifier);
// 每当 repeatWhen 上游完结的时候,这个 notificaton$ 就会吐出一个数据
notifier
的参数实际上一种特殊的Observable
对象,它既是Observable
也是Observer
,在 RxJS 中被称为Subject
。
数据源头的 Observable
需要占用资源,像 fromEvent
和 ajax
这样的操作符,还需要外部资源,所以在 RxJS 中,有时候创建一个 Observable
的代价不小,所以,希望能够尽量延迟对应 Observable
的创建,但是从方便代码的角度,又希望有一个 Observable
预先存在,这样能够方便订阅。
一方面希望 Observable
不要太早创建,另一方面又希望 Observable
尽早创建,这是一个矛盾的需求,解决这个矛盾需求的方式,就是依然创建一个 Observable
。但这个 Observable
只是一个代理(Proxy),在创建之时并不会做分配资源的工作,只有当被订阅的时候,才会去创建真正占用资源的 Observable
,之前产生的代理 Observable
会把所有工作都转交给真正占用资源的 Observable
。在 RxJS 中,defer
这个操作符实现的就是这种模式。
defer
接受一个函数作为参数,当 defer
产生的 Observable
对象被订阅的时候, defer
的函数参数就会被调用,预期这个函数会返回另一个 Observable
对象,也就是 defer
转嫁所有工作的对象。因为 Promise
和 Observable
的关系, defer
也很贴心地支持返回 Promise
对象的函数参数,当参数函数返回 Promise
对象的时候,省去了应用层开发者使用 fromPromise
转化一次的劳动。
import 'rxjs/add/observable/defer';
import 'rxjs/add/observable/of';
const observableFactory = () => Observable.of(1, 2, 3);
const source$ = Observable.defer(observableFactory);