前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出 RxJS 之 创建数据流

深入浅出 RxJS 之 创建数据流

作者头像
Cellinlab
发布2023-05-17 20:12:23
2.2K0
发布2023-05-17 20:12:23
举报
文章被收录于专栏:Cellinlab's BlogCellinlab's Blog

# 创建类操作符

功能需求

适用操作符

直接操作观察者

create

根据有限的数据产生同步数据流

of

产生一个数值范围内的数据

range

以循环方式产生数据

generate

重复产生数据流中的数据

repeat 和 repeatWhen

产生空数据流

empty

产生直接出错的数据流

throw

产生永不完结的数据流

never

间隔给定时间持续产生数据

interval 和 timer

从数组等枚举类型数据产生数据流

from

从 Promise 对象产生数据流

fromPromise

从外部事件对象产生数据流

fromEvent 和 fromEventPattern

从 Ajax 请求结果产生数据流

ajax

延迟产生数据流

defer

所谓创建类操作符,就是一些能够创造出一个 Observable 对象的方法,所谓“创造”,并不只是说返回一个 Observable 对象,因为任何一个操作符都会返回 Observable 对象,这里所说的创造,是指这些操作符不依赖于其他 Observable 对象,这些操作符可以凭空或者根据其他数据源创造出一个 Observable 对象。

创建类操作符并不是不需要任何输入,很多创建型的操作符都接受输入参数,有的还需要其他的数据源,比如浏览器的 DOM 结构或者 WebSocket 。重要的是,创建类操作符往往不会从其他 Observable 对象获取数据,在数据管道中,创建类操作符就是数据流的源头。因为创建类操作符的这个特性,创建类操作符大部分(并不是全部)都是静态操作符。

对于应用开发工程师,应该尽量使用创建类操作符,避免直接利用 Observable 的构造函数来创造 Observable 对象,RxJS 提供的创建类操作符覆盖了几乎所有的数据流创建模式,没有必要重复发明轮子。在很多场景下,开发者自己用构造函数创造 Observable 对象可能需要写很多代码,使用 RxJS 提供的创建类操作符可能只需要一行就能搞定。

# 创建同步数据流

同步数据流,或者说同步 Observable 对象,需要关心的就是:

  • 产生哪些数据
  • 数据之间的先后顺序如何

对于同步数据流,数据之间的时间间隔不存在,所以不需要考虑时间方面的问题。

# create

create 是最简单的一个操作符,因为它的功能很简单,就是直接调用 Observable 的构造函数:

代码语言:javascript
复制
Observable.create = function (subscribe) {
  return new Observable(subscribe);
}

# of:列举数据

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';

// 产生包含三个正整数的 Observable 对象
const source$ = Observable.of(1, 2, 3);

// source$ 被 subscribe 后会将参数依次吐出来
// 然后调用 Observer 的 complete 方法
source$.subscribe(
  value => console.log(value),
  err => console.error(err),
  () => console.log('complete')
);

source$ 被订阅时,吐出数据的过程是同步的,也就是没有任何时间上的间隔。

代码语言:javascript
复制
const source$ = Observable.of(1);

of 产生的是 Cold Observable,对于每一个 Observer 都会重复吐出同样的一组数据,所以可以反复使用。

适合使用 of 的场合是已知不多的几个数据,想要把这些数据用 Observable 对象来封装,然后就可以利用 RxJS 强大的数据管道功能来处理,而且,也不需要这些数据的处理要有时间间隔,这就用得上 of 了。

# range:指定范围

range 的含义就是“范围”,只需要指定一个范围的开始值和长度,range 就能够产生这个范围内的数字序列。

代码语言:javascript
复制
// 产生一个从 1 到 100 的所有正整数构成的数据流
const source$ = Observable.range(1, 100);

of 一样,range 以同步的方式吐出数据,也就是 100 个数据依次无时间间隔一口气全推给 Observer,然后调用 Observercomplete 函数。

RxJS 提供的每个操作符都包含尽量简洁的功能,但是通过多个操作符的组合,就可以提供复杂的功能。虽然 range 不支持递增序列的定制,但是可以通过 rangemap 的组合来实现。

代码语言:javascript
复制
const source$ = Observable.of(1, 2, 3).map(x => x * 2);

# generate:循环创建

generate 类似一个 for 循环,设定一个初始值,每次递增这个值,直到满足某个条件的时候才中止循环,同时,循环体内可以根据当前值产生数据。

代码语言:javascript
复制
const result = [];
for (let i = 2; i < 10; i+=2) {
  result.push(i*i);
}

// 用 generate 实现上面的代码
const source$ = Observable.generate(
  2, // 初始值
  x => x < 10, // 条件
  x => x + 2, // 递增
  x => x * x // 结果
);

使用 generate,四个参数分别对应了 for 循环中的不同表达式,其中,除了第一个参数是一个值之外,其余三个参数都是函数,应该保持这三个参数都是纯函数,这样才符合函数式编程的原则。

generate 可以有很大的自由度,可以通过 generate 产生需求复杂的数据,实际上,可以通过 generate 来实现 range 的功能:

代码语言:javascript
复制
const range = (start, count) => {
  const max = start + count;
  return Observable.generate(
    start, // 初始值
    x => x < max, // 条件
    x => x + 1, // 递增
    x => x // 结果
  );
};

在传统的 JavaScript 编程中,如果某个问题的解决方法是用一个 for 循环产生的数据集合,那么搬到 RxJS 的世界,就适合于使用 generate 来产生一个 Observable 对象。

# repeat:重复数据的数据流

repeat 的功能是可以重复上游 Observable 中的数据若干次。

代码语言:javascript
复制
const source$ = Observable.of(1, 2, 3);
const repeated$ = source$.repeat(10); // 重复 source$ 中的数据 10 次

repeated 是一个全新的 Observable 对象,它并没有改变 source ,source 自始至终还是只产生 1、2、3 然后就结束的数据流,在 repeat 的作用下,source 实际上被 subscribe 了 10 次,这 10 次 source 吐出的数据全部都变成了 repeated 吐出的数据。

值得注意的是,repeat 只有在上游 Observable 对象完结之后才会重新订阅,因为在完结之前,repeat 也不知道会不会有新的数据从上游被推送下来。

因为 repeat 的“重复”功能依赖于上游的完结时机,所以,使用 repeat 很重要的一点,就是保证上游 Observable 对象最终一定会完结,不然使用 repeat 就没有意义。

此外,repeat 的参数代表重复次数,如果不传入这个参数,或者传入参数为负数,那就代表无限次的重复,除非预期得到一个无限循环的数据流,不然应该给 repeat 一个正整数参数,这样才能保证 repeat 产生的 Observable 对象有完结的时候。

# 三个极简的操作符:empty、never 和 throw

  1. empty

empty 就是产生一个直接完结的 Observable 对象,没有参数,不产生任何数据,直接完结。

代码语言:javascript
复制
import 'rxjs/add/observable/empty';
const source$ = Observable.empty();

  1. throw

throw 产生的 Observable 对象也是什么都不做,直接出错,抛出的错误就是 throw 的参数

代码语言:javascript
复制
import 'rxjs/add/observable/throw';
const source$ = Observable.throw(new Error('Oops'));

source$ 代表的 Observable 对象不会产生任何数据,一开始就会直接给下游传递一个 Error 对象。

  1. never

never 产生的 Observable 对象什么都不做,既不吐出数据,也不完结,也不产生错误,就这样待着,一直到永远。

代码语言:javascript
复制
import 'rxjs/add/observable/never';
const source$ = Observable.never();

# 创建异步数据的 Observable 对象

异步数据流,或者说异步 Observable 对象,不光要考虑产生什么数据,还要考虑这些数据之间的时间间隔问题, RxJS 提供的操作符就是要让开发者在日常尽量不要考虑时间因素。

# interval 和 timer:定时产生数据

interval 接受一个数值类型的参数,代表产生数据的间隔毫秒数,返回的 Observable 对象就按照这个时间间隔输出递增的整数序列,从 0 开始。比如,interval 的参数是 1000,那么,产生的 Observable 对象在被订阅之后,在1秒钟的时刻吐出数据 0,在 2 秒钟的时刻吐出数据 1,在 3 秒钟的时刻吐出数据 2……

代码语言:javascript
复制
import 'rxjs/add/observable/interval';
const source$ = Observable.interval(1000);

注意到这个弹珠图中没有完结符号,表示这个数据流不会完结,因为 interval 不会主动调用下游的 complete ,要想停止这个数据序列,就必须要做退订的动作。

在 RxJS 中,每个操作符都尽量功能精简,所以 interval 并没有参数用来定制数据序列的起始值,要解决复杂问题,应该用多个操作符的组合,而不是让一个操作符的功能无限膨胀。

代码语言:javascript
复制
// 从 1 递增的数据序列
const source$ = Observable.interval(1000);
const result$ = source$.map(x => x + 1);

interval 就是 RxJS 世界中的 setInterval ,区别只是 setInterval 定时调用一个函数,而 interval 返回的 Observable 对象定时产生一个数据。

timer 的第一个参数可以是一个数值,也可以是一个 Date 类型的对象。如果第一个参数是数值,代表毫秒数,产生的 Observable 对象在指定毫秒之后会吐出一个数据 0 ,然后立刻完结。

代码语言:javascript
复制
import 'rxjs/add/observable/timer';
const source$ = Observable.timer(1000);

timer 还支持第二个参数,如果使用第二个参数,那就会产生一个持续吐出数据的 Observable 对象,类似 interval 的数据流。第二个参数指定的是各数据之间的时间间隔,从被订阅到产生第一个数据 0 的时间间隔,依然由第一个参数决定。

代码语言:javascript
复制
// 2 s 吐出 0,3 s 吐出 1,4 s 吐出 2 ...
const source$ = Observable.timer(2000, 1000);

如果 timer 的第一个参数和第二个参数一样,那就和 interval 的功能完全一样了。

# from:可把一切转化为 Observable

from 可能是创建类操作符中包容性最强的一个了,因为它接受的参数只要“像” Observable 就行,然后根据参数中的数据产生一个真正的 Observable 对象。

“像” Observable 的对象很多,一个数组就像 Observable ,一个不是数组但是“像”数组的对象也算,一个字符串也很像 Observable ,一个 JavaScript 中的 generator 也很像 Observable ,一个 Promise 对象也很像,所以, from 可以把任何对象都转化为 Observable 对象。

代码语言:javascript
复制
import 'rxjs/add/observable/from';

const source$ = Observable.from([1, 2, 3]);

# fromPromise:异步处理的交接

如果 from 的参数是 Promise 对象,那么这个 Promise 成功结束, from 产生的 Observable 对象就会吐出 Promise 成功的结果,并且立刻结束:

代码语言:javascript
复制
const promise = Promise.resolve('good');
const source$ = Observable.fromPromise(promise);

source$.subscribe(
  console.log,
  err => console.error(err),
  () => console.log('complete')
);
// good
// complete

Promise 对象虽然也支持异步操作,但是它只有一个结果,所以当 Promise 成功完成的时候, from 也知道不会再有新的数据了,所以立刻完结了产生的 Observable 对象。

# fromEvent

fromEvent 的第一个参数是一个事件源,在浏览器中,最常见的事件源就是特定的 DOM 元素,第二个参数是事件的名称,对应 DOM 事件就是 clickmousemove 这样的字符串。

代码语言:javascript
复制
<div>
  <button id="ClickMe">Click me</button>
  <div id="result"></div>
</div>
<script>
  let clickCount = 0;
  const event$ = Rx.Observable.fromEvent(
    document.querySelector('#ClickMe'),
    'click'
  );
  event$.subscribe(() => {
    clickCount++;
    document.querySelector('#result').innerText = clickCount;
  });
</script>

fromEvent 除了可以从 DOM 中获得数据,还可以从 Node.js 的 events 中获得数据:

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import EventEmitter from 'events';
import 'rxjs/add/observable/fromEvent';

const emitter = new EventEmitter();
const source$ = Observable.fromEvent(emitter, 'msg');

source$.subscribe(
  msg => console.log(msg),
  err => console.error(err),
  () => console.log('complete')
);

emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');
emitter.emit('msg-1', 'hello');
emitter.emit('msg', '!');

// hello
// world
// !

fromEvent 产生的是 Hot Observable,也就是数据的产生和订阅是无关的,如果在订阅之前调用 emitter.emit ,那有没有 Observer 这些数据都会立刻吐出来,等不到订阅的时候,当添加了 Observer 的时候,自然什么数据都获得不到。

# fromEventPattern

fromEventPattern 接受两个函数参数,分别对应产生的 Observable 对象被订阅和退订时的动作,因为这两个参数是函数,具体的动作可以任意定义,所以可以非常灵活。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';
import EventEmitter from 'events';

const emitter = new EventEmitter();

const addHandler = (handler) => {
  emitter.addListener('msg', handler);
};
const removeHandler = (handler) => {
  emitter.removeListener('msg', handler);
};

const source$ = Observable.fromEventPattern(addHandler, removeHandler);

const subscription = source$.subscribe(
  msg => console.log(msg),
  err => console.error(err),
  () => console.log('complete')
);

emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');

subscription.unsubscribe();
emitter.emit('msg', '!');

// hello
// world

fromEventPattern 提供的就是一种模式,不管数据源是怎样的行为,最后的产出都是一个 Observable 对象,对一个 Observable 对象交互的两个重要操作就是 subscribeunsubscribe ,所以, fromEventPattern 设计为这样,当 Observable 对象被 subscribe 时第一个函数参数被调用,被 unsubscribe 时第二个函数参数被调用。

# ajax

代码语言:javascript
复制
<div>
  <button id="getStar">Get RxJS Star Count</button>
  <div id="result"></div>
</div>
<script>
  Rx.Observable.fromEvent(
    document.querySelector('#getStar'),
    'click'
  ).subscribe(
    () => {
      Rx.Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', {
        responseType: 'json'
      }).subscribe(value => {
        const startCount = value.response.stargazers_count;
        document.querySelector('#result').innerText = startCount;
      });
    }
  );
</script>

# repeatWhen

repeat 能够反复订阅上游的 Observable ,但是并不能控制订阅的时间,比如希望在接收到上游完结事件的时候等待一段时间再重新订阅,这样的功能 repeat 无法做,但是 repeatWhen 可以满足上面描述的需求。

repeatWhen 接受一个函数作为参数,这个函数在上游第一次产生异常时被调用,然后这个函数应该返回一个 Observable 对象,这个对象就是一个控制器,作用就是控制 repeatWhen 何时重新订阅上游,当控制器 Observable 吐出一个数据的时候, repeatWhen 就会做退订上游并重新订阅的动作。

用一个 Observable 对象来控制另一个 Observable 对象中数据的产生,这是 RxJS 中的一个常见模式。

代码语言:javascript
复制
const notifier = () => {
  return Observable.interval(1000);
};

const source$ = Observable.of(1 , 2, 3);
const repeated$ = source$.repeatWhen(notifier);
// 每隔 1 秒钟,repeat$ 会重复订阅上游 source$,吐出的数据虽然是1、2、3序列的循环
// 但是每次循环之间间隔1秒钟。

如果 repeatWhen 的上游并不是同步产生数据,完结的时机也完全不能确定,如果想要每次在上游完结之后重新订阅,那使用 interval 来控制重新订阅的节奏就无法做到准确了,这时候就需要用到 notifier 函数的参数:

代码语言:javascript
复制
const notifier = (notification$) => {
  return notification$.delay(2000);
};

const repeated$ = source$.repeatWhen(notifier);
// 每当 repeatWhen 上游完结的时候,这个 notificaton$ 就会吐出一个数据

notifier 的参数实际上一种特殊的 Observable 对象,它既是 Observable 也是 Observer ,在 RxJS 中被称为 Subject

# defer

数据源头的 Observable 需要占用资源,像 fromEventajax 这样的操作符,还需要外部资源,所以在 RxJS 中,有时候创建一个 Observable 的代价不小,所以,希望能够尽量延迟对应 Observable 的创建,但是从方便代码的角度,又希望有一个 Observable 预先存在,这样能够方便订阅。

一方面希望 Observable 不要太早创建,另一方面又希望 Observable 尽早创建,这是一个矛盾的需求,解决这个矛盾需求的方式,就是依然创建一个 Observable 。但这个 Observable 只是一个代理(Proxy),在创建之时并不会做分配资源的工作,只有当被订阅的时候,才会去创建真正占用资源的 Observable ,之前产生的代理 Observable 会把所有工作都转交给真正占用资源的 Observable 。在 RxJS 中,defer 这个操作符实现的就是这种模式。

defer 接受一个函数作为参数,当 defer 产生的 Observable 对象被订阅的时候, defer 的函数参数就会被调用,预期这个函数会返回另一个 Observable 对象,也就是 defer 转嫁所有工作的对象。因为 PromiseObservable 的关系, defer 也很贴心地支持返回 Promise 对象的函数参数,当参数函数返回 Promise 对象的时候,省去了应用层开发者使用 fromPromise 转化一次的劳动。

代码语言:javascript
复制
import 'rxjs/add/observable/defer';
import 'rxjs/add/observable/of';

const observableFactory = () => Observable.of(1, 2, 3);

const source$ = Observable.defer(observableFactory);
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022/7/20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # 创建类操作符
  • # 创建同步数据流
    • # create
      • # of:列举数据
        • # range:指定范围
          • # generate:循环创建
            • # repeat:重复数据的数据流
              • # 三个极简的操作符:empty、never 和 throw
              • # 创建异步数据的 Observable 对象
                • # interval 和 timer:定时产生数据
                  • # from:可把一切转化为 Observable
                    • # fromPromise:异步处理的交接
                      • # fromEvent
                        • # fromEventPattern
                          • # ajax
                            • # repeatWhen
                              • # defer
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档