功能需求 | 适用的操作符 |
---|---|
过滤掉不满足判定条件的数据 | filter |
获得满足判定条件的第一个数据 | first |
获得满足判定条件的最后一个数据 | last |
从数据流中选取最先出现的若干个数据 | take |
从数据流中选取最后出现的若干个数据 | takeLast |
从数据流中选取数据直到某种情况发生 | takeWhile 和 takeUntil |
从数据流中中忽略最先出现的若干数据 | skip |
基于时间的数据流量筛选 | throttleTime 、debounceTime 和 auditTime |
基于数据内容的数据流量筛选 | throttle 、debounce 和 audit |
基于采样方式的数据流量筛选 | sample 和 sampleTime |
删除重复的数据 | distinct |
删除重复的连续数据 | distinctUntilChanged 和 distinctUntilKeyChanged |
忽略数据流中的所有数据 | ignoreElements |
只选取指定出现位置的数据 | elementAt |
判断是否只有一个数据满足判定条件 | single |
过滤类操作符最基本的功能就是对一个给定的数据流中每个数据判断是否满足某个条件,如果满足条件就可以传递给下游,否则就抛弃掉。
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/filter';
const source$ = Observable.range(1, 5);
const even$ = source$.filter(x => x % 2 === 0);
even$.subscribe(x => console.log(x));
// 2
// 4
使用 filter
产生的 Observable
对象,产生数据的时机和上游是一致的,当上游产生数据的时候,只要这个数据满足判定条件,就会立刻被同步传给下游。
const source$ = Observable.interval(1000);
const even$ = source$.filter(x => x % 2 === 0);
当使用 first
不给任何判定函数时,就相当于找上游 Observable
吐出的第一个数据:
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/first';
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const first$ = source$.first();
// 3
如果修改上面的代码,给 first
一个判定函数参数,得到的结果就会不一样:
const first$ = source$.first(x => x % 2 == 0);
// 4
first
的第二个参数是可选参数,如果使用,发挥的就是“结果选择器”的作用。
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const first$ = source$.first(
x => x % 2 == 0,
(value, index) => [value, index]
);
// [4, 2]
如果 first
的上游 Observable
到完结时依然没有满足判定条件的数据,那么 first
会向下游抛出一个 error
;而 find
与 findIndex
没有匹配的数据就会吐出一个 undefined
。
不过,也可以设置默认值:
const first$ = source$.first(
x => x < 0,
f => f,
-1
);
last
这个操作符做的事情和 first
正相反,找的是一个 Observable
中最后一个判定条件的数据。
const source$ = Observable.of(3, 1, 4, 1, 5, 9, 2, 6);
const last$ = source$.last();
//
和 first
不同的是, last
无论如何都要等到上游 Observable
完结的时候才吐出数据,因为上游 Observable
完结之前, last
也无从知道是不是拿到了“最后一个”数据。
take
只支持一个参数 count
,也就是限定拿上游 Observable
的数据数量。
const source$ = Observable.interval(1000);
const last$ = source$.take(3);
takeLast
take
相当于一个可以获取多个数据的 first
,那么 takeLast
相当于一个可以获取多个数据的 last
。
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const last3$ = source$.takeLast(3);
// 1
// 5
// 9
如果上游在一段时间范围内产生的数据,那么就必须要等到上游完结 takeLast
产生的 Observable
对象才产生数据:
const source$ = Observable.interval(1000);
const take$ = source$.take(5);
const last3$ = take$.takeLast(3);
take
的作用是获取上游的数据,只要没有超过给定的数量限制,上游产生一个数据,take
都会立刻转手给下游。所以,弹珠图上 take
产生的 Observable
对象数据产生时刻和 source$
是一致的; takeLast
只有确定上游数据完结的时候才能产生数据,而且是一次性产生所有数据,即 takeLast
在 take
产生的 Observable
对象完结时把 2、3、4 数据一次性传给下游。
takeWhile
takeWhile
接受一个判定函数作为参数,这个判定函数有两个参数,分别代表上游的数据和对应的序号, takeWhile
会吐出上游数据,直到判定函数返回 false
,只要遇到第一个判定函数返回 false
的情况, takeWhile
产生的 Observable
就完结。
const source$ = Observable.range(1, 100);
const takeWhile$ = source$.takeWhile(x => x % 2 === 0);
在上面的例子中,takeWhile$
一个数据都不吐出就完结,因为上游 source$
吐出的第一个数据是1,不满足判定条件。
因为 takeWhile
的判定函数支持第二个序号参数,所以实际上可以利用 takeWhile
来实现 take
:
Observable.prototype.take = function (count) {
return this.takeWhile((x, i) => i < count);
};
take
和 filter
的组合
如果想要获得上游 Observable
满足条件的前 N
个数据,怎么办呢?
Observable.prototype.takeCountWhile = function (count, predicate) {
return this.filter(predicate).take(count);
};
注意,filter
并不是抽干上游 Observable
才传递数据给 take
,而是对上游每个数据都在用 predicate
判定通过之后,立刻传递给 take
。
const source$ = Observable.interval(1000);
const even$ = source$.takeCountWhile(2, x => x % 2 === 0);
takeUntil
takeUntil
是一个里程碑式的过滤类操作符,因为 takeUntil
让我们可以用 Observable
对象来控制另一个 Observable
对象的数据产生。
takeUntil
的神奇特点就是其参数是另一个 Observable
对象 notifier
,由这个 notifier
来控制什么时候结束从上游 Observable
拿数据,因为 notifier
本身又是一个 Observable
,吐出数据可以非常灵活,这就意味着可以利用非常灵活的规则用 takeUntil
产生下游 Observable
。
使用 takeUntil
,上游的数据直接转手给下游,直到(Until)参数 notifier
吐出一个数据或者完结,这个 notifier
就像一个水龙头开关,控制着 takeUntil
产生的 Observable
对象,一开始这个水龙头开关是打开状态,上游的数据像水一样直接流到下游,但是 notifier
只要一有动静,水龙头开关立刻关闭,上游通往下游的通道也就关闭了。
const source$ = Observable.interval(1000);
const notifier$ = Observable.timer(2500);
const takeUntil$ = source$.takeUntil(notifier$);
skip
skip
接受一个 count
参数,会默默忽略上游 Observable
吐出的前 count
个数据,然后,从第 count+1
个数据开始,就和上游 Observable
保持一致了,上游 Observable
吐出什么数据, skip
产生的 Observable
就吐出什么数据,上游 Observable
完结, skip
产生的 Observable
跟着完结。当然,如果上游吐出的数据不够 count
个,那 skip
产生的 Observable
就会在上游 Observable
完结的时候立刻完结。
const source$ = Observable.interval(1000);
const skip$ = source$.skip(3);
skipWhile
和 skipUntil
“回压”(Back Pressure)也称为“背压”,是一个源自于传统工程中的概念,在一个传输管道中,液体或者气体应该朝某一个方向流动,但是前方管道口径变小,这时候液体或者气体就会在管道中淤积,产生一个和流动方向相反的压力,因为这个压力的方向是往回走的,所以称为回压。
在 RxJS 的世界中,数据管道就像是现实世界中的管道,数据就像是现实中的液体或者气体,如果数据管道中某一个环节处理数据的速度跟不上数据涌入的速度,上游无法把数据推送给下游,就会在缓冲区中积压数据,这就相当于对上游施加了压力,这就是 RxJS 世界中的“回压”。
回压这种现象的根源是数据管道中某个环节数据涌入的速度超过了处理速度,那么,既然处理不过来,干脆就舍弃掉一些涌入的数据,这种方式称为“有损回压控制”(Lossy Backpressure Control),通过损失掉一些数据让流入和处理的速度平衡。
throttleTime
的作用是限制在 duration
时间范围内,从上游传递给下游数据的个数; debounceTime
的作用是让传递给下游的数据间隔不能小于给定的时间 dueTime
。
const source$ = Observable.interval(1000);
const result$ = source$.throttleTime(2000);
result$.subscribe(console.log);
// 0
// 2
// 4
const source$ = Observable.interval(1000);
const result$ = source$.debounceTime(2000);
因为 debounceTime
要等上游在 dueTime
毫秒范围内不产生任何其他数据时才把这个数据传递给下游,如果在 dueTime
范围内上游产生了新的数据,那么 debounceTime
就又要重新开始计时。
const source$ = Observable.interval(1000);
const filter$ = source$.filter(x => x % 3 === 0);
const result$ = filter$.debounceTime(2000);
使用 throttleTime
和 debounceTime
的一个常见场景就是用来减少不必要的 DOM 事件处理。
当数据流中可能有大量数据产生,希望一段时间内爆发的数据只有一个能够被处理到,这时候就应该使用 throttleTime
。
对于 debounceTime
,适用情况是,只要数据在以很快的速度持续产生时,那就不去处理它们,直到产生数据的速度降下来。
throttle
和 debounce
和不带 Time
后缀的兄弟操作符的区别是,这两个操作符不是用时间来控制流量,而是用 Observable
中的数据来控制流量。
throttle
的参数是一个函数,这个函数应该返回一个 Observable
对象,这个 Observable
对象可以决定 throttle
如何控制上游和下游之间的流量。
const source$ = Observable.interval(1000);
const durationSelector = (value) => {
console.log(`call durationSelector with ${value}`);
return Observable.timer(2000);
};
const result$ = source$.throttle(durationSelector);
result$.subscribe(console.log);
// call durationSelector with 0
// 0
// call durationSelector with 2
// 2
// call durationSelector with 4
// 4
当 source$
产生第一个数据 0
的时候, throttle
就和 throttleTime
一样,毫不犹豫地把这个数据 0
传给了下游,在此之前会用这个数据 0
作为参数调用 durationSelector ,然后订阅 durationSelector
返回的 Observable
对象,在这个 Observable
对象产生第一个对象之前,所有上游传过来的数据都会被丢弃,于是, source$
产生的数据 1
就被丢弃了,因为 durationSelector
返回的 Observable
对象被订阅之后 2000
毫秒才会产生数据。
durationSelector
产生 Observable
对象只有第一个产生的数据会有作用,而且这个数据的产生时机是关键,至于这个数据是个什么值反而不重要,在上面的例子中,使用 timer
来产生只有一个数据的 Observable
对象,当然也可以使用 interval
来产生多个数据的 Observable
对象,但是依然只有第一个数据起到作用。
如果 durationSelector
只是返回固定延时产生数据的 Observable
对象,那么 throttle
的功能就和 throttleTime
没有两样,不过, durationSelector
有参数,就是当前传给下游的数据,所以完全可以根据这个参数来产生更灵活的操作。
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
对于 debounce
,和 debounceTime
相比一样是用一个函数参数代替了数值参数,这样就可以产生更灵活的时间控制。
const source$ = Observable.interval(1000);
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
const result$ = source$.debounce(durationSelector);
durationSelector
函数返回的 Observable
第一个数据产生时间延迟取代了 debounceTime
的 dueTime
参数,决定了上游一个数据会被延迟多久传给下游,因为 3
的倍数延时 2000
毫秒,总是会被下一个数据“打断”,所以3的倍数总是进入不了下游。
可以认为 audit
是做 throttle
类似的工作,不同的是在“节流时间”范围内, throttle
把第一个数据传给下游, audit
是把最后一个数据传给下游。
const source$ = Observable.interval(1000);
const result$ = source$.auditTime(2000);
result$.subscribe(console.log);
// 1
// 3
// 5
audit
也接受 durationSelector
这样的函数参数:
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(
Observable.interval(1000).take(3).mapTo('B')
)
.concat(
Observable.interval(500).take(3).mapTo('C')
);
const durationSelector = value => {
return Observable.timer(800);
};
const result$ = source$.audit(durationSelector);
sample
是要根据规则在一个范围内取一个数据,抛弃其他数据。
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(
Observable.interval(1000).take(3).mapTo('B')
)
.concat(
Observable.interval(500).take(3).mapTo('C')
);
const result$ = source$.sampleTime(800);
表面上看 sampleTime
和 auditTime
非常像, auditTime
也会把时间块中最后一个数据推给下游,但是对于 auditTime
时间块的开始是由上游产生数据触发的,而 sampleTime
的时间块开始则和上游数据完全无关,所以,可以看到 sampleTime
产生的数据序列分布十分均匀。
注意,如果 sampleTime
发现一个时间块内上游没有产生数据,那在时间块结尾也不会给下游传递数据。
sample
的参数并不是一个返回 Observable
对象的函数,而就是一个简单的 Observable
对象。 sample
之所以这样设计,是因为对于“采样”这个动作,逻辑上可以认为和上游产生什么数据没有任何关系,所以不需要一个函数来根据数据产生 Observable
对象控制节奏,直接提供一个 Observable
对象就足够了。
通常 sample
的参数被称为 notifier
,当 notifier
产生一个数据的时候, sample
就从上游拿最后一个产生的数据传给下游。
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinct();
distinct$.subscribe(console.log);
// 0
// 1
// 2
// 3
distinct
提供了一个函数参数 keySelector
,用于定制 distinct
应该比对什么样的属性。
distinct
还有一个潜在的问题需要注意,如果上游产生的不同数据很多,那么可能会造成内存泄露。为了克服这个缺点,distinct
还提供第二个可选的参数 flush
,第二个参数可以是一个 Observable
对象,每当这个 Observable
对象产生数据时,distinct
就清空“唯一数据集合”,一切重来,这样就避免了内存泄露。
distinctUntilChanged
拿到一个数据不是和一个“唯一数据集合”比较,而是直接和上一个数据比较,也就是说,这个操作符要保存上游产生的上一个数据就足够,当然,也就没有了 distinct
潜在的内存泄露问题。
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinctUntilChanged();
distinct$.subscribe(console.log);
// 0
// 1
// 2
// 0
// 1
// 3
ignoreElments
就是要忽略所有的元素,这里的元素是指上游产生的数据,忽略所有上游数据,只关心 complete
和 error
事件。
const source$ = Observable.interval(1000).take(5);
const result$ = source$.ignoreElements();
elementAt
把上游数据当数组,只获取指定下标的那一个数据,就这个简单功能,使用 first
配合函数参数也一样能够实现。不过 elementAt
还有一个附加功能体现了自己的存在价值,它的第二个参数可以指定没有对应下标数据时的默认值。
single
这个操作符用来检查上游是否只有一个满足对应条件的数据,如果答案为“是”,就向下游传递这个数据;如果答案为“否”,就向下游传递一个异常。