前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJS速成

RxJS速成

原创
作者头像
solenovex
修改2018-03-26 11:59:22
4.1K6
修改2018-03-26 11:59:22
举报
文章被收录于专栏:草根专栏草根专栏

What is RxJS?

RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能

下面废话不说, 直接切入正题.

准备项目

我使用typescript来介绍rxjs. 因为我主要是在angular项目里面用ts.

全局安装typescript:

代码语言:javascript
复制
npm install -g typescript

全局安装ts-node:

代码语言:javascript
复制
npm install -g ts-node

建立一个文件夹learn-rxjs, 进入并执行:

代码语言:javascript
复制
npm init

安装rxjs:

代码语言:javascript
复制
npm install rxjs --save

RxJS的主要成员

  • Observable: 一系列值的生产者
  • Observer: 它是observable值的消费者
  • Subscriber: 连接observer和observable
  • Operator: 可以在数据流的途中对值进行转换的操作符
  • Subject: 既包括Observable也包括Observer

Observable, Observer, Subscriber的角色关系:

工厂生产杂志, 邮递员去送杂志, 就相当于是Observable, 邮递员给你带来了啥? 带来了杂志, 然后(next)杂志, next杂志.....

把杂志带给了谁? 看看这对夫妇, 可能是丈夫来付账单订杂志, 他就是Subscriber. 而这本女性杂志肯定不是丈夫来看(如果他是正经丈夫的话), 而妻子没有直接去订阅杂志, 但是她看这本杂志有用(知道怎么去用它).

所以可以这样理解, 丈夫(Subscriber)把Observable和Observer联系到了一起, 就是Subscriber为Observable提供了一个Observer(丈夫订杂志, 告诉快递员把货给他媳妇就行).

Observable可以在Observer上调用三种方法(快递员跟他妻子可能会有三种情况...好像这么说不太恰当), 当Observable把数据(杂志)传递过来的时候, 这三种情况是:

  • next(), 这期杂志送完了, 等待下一期吧
  • error(), 送杂志的时候出现问题了, 没送到.
  • complete(), 订的杂志都处理完了, 以后不送了.

下面这个图讲的就是从Observable订阅消息, 并且在Observer里面处理它们:

Observable允许:

  • 订阅/取消订阅它的数据流
  • 发送下一个值给Observer
  • 告诉Observer发生了错误以及错误的信息
  • 告诉Observer整个流结束了.

Observer可以提供:

  • 一个可以处理流(stream)上的next的值的function
  • 处理错误的function
  • 处理流结束的function

创建Observable

  • Observable.from(), 把数组或iterable对象转换成Observable
  • Observable.create(), 返回一个可以在Observer上调用方法的Observable.
  • Observable.fromEvent(), 把event转换成Observable.
  • Observable.fromPromise(), 把Promise转换成Observable.
  • Observable.range(), 在指定范围内返回一串数.

Observable.from()

observable_from.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable"; // 这里没有使用Rx对象而是直接使用其下面的Observable对象, 因为Rx里面很多的功能都用不上.
import 'rxjs/add/observable/from'; // 这里我需要使用from 操纵符(operator)

let persons = [
    { name: 'Dave', age: 34, salary: 2000 },
    { name: 'Nick', age: 37, salary: 32000 },
    { name: 'Howie', age: 40, salary: 26000 },
    { name: 'Brian', age: 40, salary: 30000 },
    { name: 'Kevin', age: 47, salary: 24000 },
];

let index = 1;
Observable.from(persons)
    .subscribe(
        person => {
            console.log(index++, person);
        },
        err => console.log(err),
        () => console.log("Streaming is over.")
    );
复制代码
复制代码

subscribe里面有3个function, 这3个function就是Observer.

第一个function是指当前这个person到来的时候需要做什么;

第二个是错误发生的时候做什么;

第三个function就是流都走完的时候做什么.

注意, 是当执行到.subscribe()的时候, Observable才开始推送数据.

运行这个例子需要执行下面的命令:

代码语言:javascript
复制
ts-node observable_from.ts

Observable.create()

Observable.create是Observable构造函数的一个别名而已. 它只有一个参数就是subscribe function. 

observable_creates.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";

function getData() {

    let persons = [
        { name: 'Dave', age: 34, salary: 2000 },
        { name: 'Nick', age: 37, salary: 32000 },
        { name: 'Howie', age: 40, salary: 26000 },
        { name: 'Brian', age: 40, salary: 30000 },
        { name: 'Kevin', age: 47, salary: 24000 },
    ];

    return Observable.create(
        observer => { // 这部分就是subscribe function
            persons.forEach(p => observer.next(p));
            observer.complete();
        }
    );
}

getData()
    .subscribe(
        person => console.log(person.name),
        err => console.error(err),
        () => console.log("Streaming is over.")
    );
复制代码
复制代码

create里面的部分是subscribe function. 这部分可以理解为, 每当有人订阅这个Observable的时候, Observable会为他提供一个Observer.

在这里面, observer使用next方法对person进行推送. 当循环结束的时候, 使用complete()方法通知Observable流结束了.

尽管getDate里面create了Observable, 但是整个数据流动并不是在这时就开始的. 在这个地方, 这只不过是个声明而已.

只有当有人去订阅这个Observable的时候, 整个数据流才会流动.

运行该文件:

RxJS Operator(操作符)

Operator是一个function, 它有一个输入, 还有一个输出. 这个function输入是Observable输出也是Observable.

在function里面, 可以做一些转换的动作

下面是几个例子:

代码语言:javascript
复制
observablePersons.filter(p => p.age > 40);

这个filter function和数组的filter类似, 它接受另一个function(也可以叫predicate)作为参数, 这个function提供了某种标准, 通过这个标准可以判定是否当前的元素可以被送到订阅者那里.

代码语言:javascript
复制
p => p.age > 40

这个function, 是pure function, 在functional programming(函数式编程)里面, pure function是这样定义的: 如果参数是一样的, 无论外界环境怎么变化, 它的结果肯定是一样的.

pure function不与外界打交道, 不保存到数据库, 不会存储文件, 不依赖于时间....

而这个filter function呢, 在函数式编程里面是一个high order function.

什么是High order function

如果一个function的参数可以是另一个function, 或者它可以返回另一个function, 那么它就是High Order function.

Marble 图

首先记住这个网址: http://rxmarbles.com/

有时候您可以通过文档查看operator的功能, 有时候文档不是很好理解, 这时你可以参考一下marble 图.

例如 map:

可以看到map接受一个function作为参数, 通过该function可以把每个元素按照function的逻辑进行转换.

例如 filter:

filter就是按条件过滤, 只让合格的元素通过.

例 debounceTime (恢复时间):

如果该元素后10毫秒内, 没有出现其它元素, 那么该元素就可以通过.

例 reduce:

这个也和数组的reduce是一个意思.

例子

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/reduce';


let persons = [
    { name: 'Dave', age: 34, salary: 2000 },
    { name: 'Nick', age: 37, salary: 32000 },
    { name: 'Howie', age: 40, salary: 26000 },
    { name: 'Brian', age: 40, salary: 30000 },
    { name: 'Kevin', age: 47, salary: 24000 },
];

Observable.from(persons)
    .map(p => p.salary)
    .reduce((total, current) => total+ current, 0)
    .subscribe(
        totalSalary => console.log(`Total salary is ${totalSalary}`),
        err => console.log(err)
    );
复制代码
复制代码

这个例子非常的简单, 典型的map-reduce, 就不讲了.

结果如下:

用现实世界中炼钢生产流程的例子来解释使用Operator来进行Reactive数据流处理的过程:

原料(矿石)整个过程中会经过很多个工作站, 这里每个工作站都可以看作是RxJS的operator, 原料经过这些operator之后, 成品就被生产了出来.

每个工作站(operator)都是可以被组合使用的, 所以可以再加几个工作站也行.

错误处理

Observable是会发生错误的, 如果错误被发送到了Observer的话, 整个流就结束了.

但是做Reactive编程的话, 有一个原则: Reactive的程序应该很有弹性/韧性.

也就是说, 即使错误发生了, 程序也应该继续运行.

但是如果error function在Observer被调用了的话, 那就太晚了, 这样流就停止了.

那么如何在error到达Observer之前对其进行拦截, 以便流可以继续走下去或者说这个流停止了,然后另外一个流替它继续走下去?

错误处理的Operators:

  • error() 被Observable在Observer上调用
  • catch() 在subscriber里并且在oserver得到它(错误)之前拦截错误,
  • retry(n) 立即重试最多n次
  • retryWhen(fn) 按照参数function的预定逻辑进行重试

使用catch()进行错误处理:

observable_catch.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';

function getFromGoogle(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://google.com');
        observer.error({
            message: 'Google can\'t be reached.',
            status: 404,
        });
        observer.complete();
    });
}

function getFromBing(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://global.bing.com');
        observer.complete();
    });
}


function getFromBaidu(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://www.baidu.com');
        observer.complete();
    });
}

getFromGoogle()
    .catch(err => {
        console.error(`Error: ${err.status}: ${err.message}`);
        if(err.status === 404) {
            return getFromBaidu();
        } else {
            return getFromBing();
        }
    })
    .map(x => `The site is : ${x}`)
    .subscribe(
        x => console.log('Subscriber got: ' + x),
        err => console.error(err),
        () => console.log('The stream is over.')
    );
复制代码
复制代码

在subscribe之前调用catch, catch里可以进行流的替换动作.

运行结果如下:

相当于:

Hot 和 Cold Observable

Cold: Observable可以为每个Subscriber创建新的数据生产者

Hot: 每个Subscriber从订阅的时候开始在同一个数据生产者那里共享其余的数据.

从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.

举个例子:

Cold: 就相当于我在腾讯视频买体育视频会员, 可以从头看里面的足球比赛.

Hot: 就相当于看足球比赛的现场直播, 如果来晚了, 那么前面就看不到了.

Share Operator

share() 操作符允许多个订阅者共享同一个Observable. 也就是把Cold变成Hot.

例子 observable_share.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/share';

const numbers = Observable
    .interval(1000)
    .take(5)
    .share();

function subscribeToNumbers(name) {
    numbers.subscribe(
        x => console.log(`${name}: ${x}`)
    );
}

subscribeToNumbers('Dave');

const anotherSubscription = () => subscribeToNumbers('Nick');

setTimeout(anotherSubscription, 2500);
复制代码
复制代码

这里interval是每隔1秒产生一个数据, take(5)表示取5个数, 也就是1,2,3,4,5.

然后share()就把这个observable从cold变成了hot的.

后边Dave进行了订阅.

2.5秒以后, Nick进行了订阅.

最后结果是:

Subject

Subject比较特殊, 它即是Observable又是Observer.

作为Observable, Subject是比较特殊的, 它可以对多个Observer进行广播, 而普通的Observable只能单播, 它有点像EventEmitters(事件发射器), 维护着多个注册的Listeners.

作为Observable, 你可以去订阅它, 提供一个Observer就会正常的收到推送的值. 从Observer的角度是无法分辨出这个Observable是单播的还是一个Subject.

从Subject内部来讲, subscribe动作并没有调用一个新的执行来传递值, 它只是把Observer注册到一个列表里, 就像其他库的AddListener一样.

作为Observer, 它是一个拥有next(), error(), complete()方法的对象, 调用next(value)就会为Subject提供一个新的值, 然后就会多播到注册到这个Subject的Observers.

例子 subject.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Subject } from "rxjs/Subject";

const subject = new Subject();

const subscriber1 = subject.subscribe({
    next: (v) => console.log(`observer1: ${v}`)
});
const subscriber2 = subject.subscribe({
    next: (v) => console.log(`observer2: ${v}`)
});

subject.next(1);
subscriber2.unsubscribe();
subject.next(2);

const subscriber3 = subject.subscribe({
    next: (v) => console.log(`observer3: ${v}`)
});

subject.next(3);
复制代码
复制代码

订阅者1,2从开始就订阅了subject. 然后subject推送值1的时候, 它们都收到了. 

然后订阅者2, 取消了订阅, 随后subject推送值2, 只有订阅者1收到了.

后来订阅者3也订阅了subject, 然后subject推送了3, 订阅者1,3都收到了这个值.

下面是一个angular 5的例子:

app.component.html:

复制代码
复制代码
代码语言:javascript
复制
<h3>从Subject共享Observable到多个Subscribers</h3>
<input type="text" placeholder="start typing" (input)="mySubject.next($event)" (keyup)="mySubject.next($event)">

<br> Subscriber to input events got {{inputValue}}
<br>
<br> Subscriber to keyup events got {{keyValue}}
复制代码
复制代码

app.component.ts:

复制代码
复制代码
代码语言:javascript
复制
import { Component } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  title = 'app';

  keyValue: string;
  inputValue: string;

  mySubject: Subject<Event> = new Subject();

  constructor() {
    // subscriber 1
    this.mySubject.filter(({ type }) => type === 'keyup')
      .map(e => (<KeyboardEvent>e).key)
      .subscribe(value => this.keyValue = value);

    // subscriber 2
    this.mySubject.filter(({ type }) => type === 'input')
      .map(e => (<HTMLInputElement>e.target).value)
      .subscribe(value => this.inputValue = value);
  }
}
复制代码
复制代码

input和keyup动作都把event推送到mySubject, 然后mySubject把值推送给订阅者, 订阅者1通过过滤和映射它只处理keyup类型的事件, 而订阅者2只处理input事件.

效果:

BehaviorSubject

BehaviorSubject 是Subject的一个变种, 它有一个当前值的概念, 它会把它上一次发送给订阅者值保存起来, 一旦有新的Observer进行了订阅, 那这个Observer马上就会从BehaviorSubject收到这个当前值.

也可以这样理解BehaviorSubject的特点:

  • 它代表一个随时间变化的值, 例如, 生日的流就是Subject, 而一个人的年龄流就是BehaviorSubject.
  • 每个订阅者都会从BehaviorSubject那里得到它推送出来的初始值和最新的值.
  • 用例: 共享app状态.

例子 behavior-subject.ts:

复制代码
复制代码
代码语言:javascript
复制
import { BehaviorSubject } from "rxjs/BehaviorSubject";

const subject = new BehaviorSubject(0);

subject.subscribe({
    next: v => console.log(`Observer1: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: v => console.log(`Observer2: ${v}`)
});

subject.next(3);
复制代码
复制代码

效果:

常用Operators:

concat 

concat: 按顺序合并observables. 只会在前一个observable结束之后才会订阅下一个observable.

它适合用于顺序处理, 例如http请求.

例子: 

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/concat';

let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');

Observable.concat(firstReq, secondReq)
    .subscribe(res => console.log(res));
复制代码
复制代码

效果:

merge

把多个输入的observable交错的混合成一个observable, 不按顺序.

merge实际上是订阅了每个输入的observable, 它只是把输入的observable的值不带任何转换的发送给输出的Observable. 只有当所有输入的observable都结束了, 输出的observable才会结束. 任何在输入observable传递来的错误都会立即发射到输出的observable, 也就是把整个流都杀死了 .

例子:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/merge';

let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');

Observable.merge(firstReq, secondReq)
    .subscribe(res => console.log(res));
复制代码
复制代码

效果:

mergeMap (原来叫flatMap)

mergeMap把每个输入的Observable的值映射成Observable, 然后把它们混合成一个Observable.

mergeMap可以把嵌套的observables拼合成非嵌套的observable.

它有这些好处:

  • 不必编写嵌套的subscribe()
  • 把每个observable发出来的值转换成另一个observable
  • 自动订阅内部的observable并且把它们(可能)交错的合成一排.

这个还是通过例子来理解比较好:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/mergeMap';

function getData() {
    const students = Observable.from([
        { name: 'Dave', age: 17 },
        { name: 'Nick', age: 18 },
        { name: 'Lee', age: 15 }
    ]);

    const teachers = Observable.from([
        { name: 'Miss Wan', age: 28 },
        { name: 'Mrs Wang', age: 31 },
    ]);

    return Observable.create(
        observer => {
            observer.next(students);
            observer.next(teachers);
        }
    );
}

getData()
    .mergeMap(persons => persons)
    .subscribe(
        p => console.log(`Subscriber got ${p.name} - ${p.age}`)
    );
复制代码
复制代码

效果:

switchMap

switchMap把每个值都映射成Observable, 然后使用switch把这些内部的Observables合并成一个.

switchMap有一部分很想mergeMap, 但也仅仅是一部分像而已.

因为它还具有取消的效果, 每次发射的时候, 前一个内部的observable会被取消, 下一个observable会被订阅. 可以把这个理解为切换到一个新的observable上了.

这个还是看marble图比较好理解:

例子: 

复制代码
复制代码
代码语言:javascript
复制
// 立即发出值, 然后每5秒发出值
const source = Rx.Observable.timer(0, 5000);
// 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值
const example = source.switchMap(() => Rx.Observable.interval(500));
// 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));
复制代码
复制代码

更好的例子是: 网速比较慢的时候, 客户端发送了多次重复的请求, 如果前一次请求在2秒内没有返回的话, 那么就取消前一次请求, 不再需要前一次请求的结果了, 这里就应该使用debounceTime配合switchMap.

mergeMap vs switchMap的例子

mergeMap:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';

const outer = Observable.interval(1000).take(2);

const combined = outer.mergeMap(x => {
    return Observable.interval(400)
        .take(3)
        .map(y => `outer ${x}: inner ${y}`);
});

combined.subscribe(res => console.log(`result ${res}`));
复制代码
复制代码

效果:

switchMap:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';

const outer = Observable.interval(1000).take(2);

const combined = outer.switchMap(x => {
    return Observable.interval(400)
        .take(3)
        .map(y => `outer ${x}: inner ${y}`);
});

combined.subscribe(res => console.log(`result ${res}`));
复制代码
复制代码

zip

zip操作符也会合并多个输入的observables成为一个observable. 多个输入的observable的值, 按顺序, 按索引进行合并, 如果某一个observable在该索引上的值还没有发射值, 那么会等它, 直到所有的输入observables在该索引位置上的值都发射出来, 输出的observable才会发射该索引的值.

例子:

复制代码
复制代码
代码语言:javascript
复制
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';

let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);

Observable
    .zip(age$,
        name$,
        isDev$,
        (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
    .subscribe(x => console.log(x));
复制代码
复制代码

效果:

就不往下写了, 其实看文档就行, 还是概念比较重要.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • What is RxJS?
  • 准备项目
  • RxJS的主要成员
    • Observable, Observer, Subscriber的角色关系:
    • 创建Observable
      • Observable.from()
        • Observable.create()
        • RxJS Operator(操作符)
        • Marble 图
          • 例子
          • 错误处理
          • Hot 和 Cold Observable
            • Share Operator
            • Subject
            • BehaviorSubject
            • 常用Operators:
              • concat 
                • merge
                  • mergeMap (原来叫flatMap)
                    • switchMap
                      • mergeMap vs switchMap的例子
                        • zip
                        相关产品与服务
                        云直播
                        云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档