从观察者模式到迭代器模式系统讲解 RxJS Observable(二)

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现。

Reactive Extensions(Rx)是对 LINQ 的一种扩展,他的目标是对异步的集合进行操作,也就是说,集合中的元素是异步填充的,比如说从 Web 或者云端获取数据然后对集合进行填充。

LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和 Visual Basic 语言的扩展。它允许编写 C# 或者 Visual Basic 代码以操作内存数据的方式,查询数据库。

在上文 从观察者模式到迭代器模式系统讲解 RxJS Observable(一)中,我们从观察者模式到迭代器模式,系统的讲解了 Observable 的实现原理和基本用法,这里我们将继续介绍 RxJS Observable 的操作符 Operators

RxJS 提供了多种用于操作 Observables 的操作符 OperatorsOperators 是一个函数,它们可以被分类为:创建转换过滤合并多播错误处理等。

创建类操作符 Creation Operators

创建类操作符(Creation Operators)可以作为独立的函数被调用,用于创建 Observable 对象。

官网列举的一些创建类操作符:

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

这里只列举其中几个操作符的用法。

Of

将参数转换为可观察序列,并作为单独的 next 通知发布(emit)出去。

import { of } from 'rxjs';

of(10, 20, 30).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
// result:
// 'next: 10'
// 'next: 20'
// 'next: 30'

from

可以把一个数组、类数组、Promise、以及 Iterable 对象 转化为 Observable,并通知发布该 promise、array 或 iterable 中的项。

import { from } from 'rxjs';

const array = [10, 20, 30];
const result = from(array);
result.subscribe(x => console.log(x));

// Logs:
// 10
// 20
// 30

fromEvent

创建一个 Observable,它发出来自给定事件目标的特定类型的事件。

fromEvent 接受第一个参数作为事件目标,它是事件的处理对象。第二个参数接收一个字符串,表示我们想要监听的事件类型。

// 在 document 对象上绑定 click 事件

import { fromEvent } from 'rxjs';

const clicks = fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

bindCallback

将处理回调的 API 转换为可以返回 Observable 对象的函数。bindCallback 本质上不是运算符,因为它的输入和输出不是 Observables。

// 将 jQuery的 getJSON API 转换为 Observable API

import { bindCallback } from 'rxjs';
import * as jQuery from 'jquery';

// Suppose we have jQuery.getJSON('/my/url', callback)
const getJSONAsObservable = bindCallback(jQuery.getJSON);
const result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));

转换类操作符 Transformation Operators

转换类的操作符为处理 Observables 提供一种优雅的处理方式,可以在事件源Observable 与响应者 Observer 之间进行操作。

列举一些官网上的转换类操作符:

  • groupBy
  • map
  • mapTo
  • mergeMap
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • pluck
  • scan
  • switchMap
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

map

map 方法跟我们平常使用的方式是一样的,不同的只是这里是将 Observables 源进行转换,然后将结果作为新的 Observable 发出去。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

map(x => 10 * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 10 
// value: 20
// value: 30

mapTo

每次 Observable 源发出一个值时,在输出的 Observable 上发出一个给定的常量值。

// 将每次点击都映射出字符串 'Hi'
import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));

pluck

将给定的源(对象)映射到其给定属性上。

mapTopluck 都是 map 的一种变形。

使用 pluck 将每次点击映射到目标元素 targettagName 属性上。

import { fromEvent } from 'rxjs';
import { pluck } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const tagNames = clicks.pipe(pluck('target', 'tagName'));
tagNames.subscribe(x => console.log(x));

等价于使用 map:

import { fromEvent } from 'rxjs';
import { pluck, map } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const tagNames = clicks.pipe(map(event => event.target.tagName))
tagNames.subscribe(x => console.log(x));

过滤类操作符 Filtering Operators

过滤类操作符(filtering Operators)类似于转换类操作符,对 Observables 源进行中间过滤操作,发出新的 Observable。

列举一些过滤类操作符:

  • filter
  • first
  • ignoreElements
  • last
  • sample
  • sampleTime
  • single
  • skip
  • skipLast
  • skipUntil
  • skipWhile
  • take
  • takeLast
  • takeUntil
  • takeWhile
  • throttle

first

仅发出 Observable 源发出的第一个值(或满足某些条件的第一个值)。

import { of } from 'rxjs';
import { first } from 'rxjs/operators';

first()(of(1, 2, 3).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1

filter

过滤 Observable 源发出的每一项,只发出满足指定条件的项。

// 仅发出目标为 div 元素d的点击事件
import { fromEvent } from 'rxjs';
import { filter } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const clicksOnDivs = clicks.pipe(filter(ev => ev.target.tagName === 'DIV'));
clicksOnDivs.subscribe(x => console.log(x));

合并类操作符 Filtering Operators

合并类操作符也是创建类操作符,它们具有连接合并功能,它们将多个 Observable 源连接组合到一起, 并将它们的值一一发出。

  • combineLatest
  • concat
  • forkJoin
  • merge
  • race
  • zip

concat

合并给定的 Observable 并创建一个 Observable 输出,像管道一样,它顺序发出给定 Observable 的所有值,等待前一个发送完,然后继续下一个。

将从 0 到 3 的定时器和从 1 到 10 的序列连接起来,并按顺序输出:

import { concat, interval, range } from 'rxjs';
import { take } from 'rxjs/operators';

// 输出0~3,每秒输出一次
const timer = interval(1000).pipe(take(4));
// 输出1到10之间的整数
const sequence = range(1, 10); 
const result = concat(timer, sequence);
result.subscribe(x => console.log(x));

// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10

merge

创建一个 Observable 输出,它同时发出每个给定 Observable 输入的所有值。

不同于 concat, merge 不会按顺序发出,它会穿插发出的值,不会一直等待前一个。

import { merge, fromEvent, interval } from 'rxjs';

// 创建点击 Observable
const clicks = fromEvent(document, 'click');
// 创建 间隔为 1s 的定时器 Observable
const timer = interval(1000);
// 合并两个 Observable
const clicksOrTimer = merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

// 输出结果:
// 在有点击事件发生时,点击发出的值会穿插到定时器发出的值中。

operators 为我们提供了许多操作 Observables 的方法,以上我们只是列举其中几种常见的操作符用法,想了解更多可以到 RxJS 官网上查看更多详细的介绍。今天就写到这里,下期我们在继续介绍多播操作符 Multicasting Operators以及其他操作符。

原文发布于微信公众号 - 前端infoQ(webinfoq)

原文发表时间:2019-06-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券