Rx.js 入门笔记

基本概念

  • Observable 可观察者, 生产数据
  • Observer 观察者, 消费数据
  • Subscription 订阅/可清理对象, 用以清理资源或中断Observeable执行
  • Subject 多播主体, 向多个订阅者广播数据
  • Operators 操作符, 处理数据的函数

数据获取方式, 推送/拉取

  • 数据的获取方式,表示了数据生产者和数据消费者之间的通信关系
  • 拉取: 由消费者控制何时获取数据, 例如:请求状态管理器中的状态指
  • 推送: 有生产者控制何时获取数据, 例如:向服务器请求数据

可观察者 Observable

  • 基础创建
import { Observable } from 'rxjs';

const ob = Observable.create(observer =>{
    // 定义观察者操作
    
    // next 推送数据
    observer.next(1)
    
    // complete 完成后台执行
    observer.complete();
    
    // unsubscrit 自定义中断订阅
    return () =>{....}
    
})
  • 其他创建方法, of, from, fromEvent, fromPromise, interval, range 等API
  • 订阅 subscribe() 当可观察者未被订阅时,将不会被执行
observable.subscribe( data => {
    ...
    执行数据操作
} )
  • 执行
  • next: 推送通知
  • error: 异常通知
  • complete: 完成通知
import { Observable } from 'rxjs';

const ob = Observable.create(observer =>{
    try{    
        observer.next(1)
        observer.complete();
    }catch(err) {
        observer.error(err);   
    }
})

观察者

  • 观察者定义了如何处理数据或错误
  • 观察者可配置三种数据处理方法
    • 'next':正常处理
    • 'error': 错误处理
    • 'complete': 完成处理
const observer = {
    next: data => console.log(data),
    error: err => console.error(err),
    complete: () => console.log('end')
}

// 执行订阅
observable.subscribe(observer);

订阅 Subscription

  • 提供清理数据,取消Observable执行, 取消订阅
const subscription = observable.subscribe(data => {....});

subscription.unsubscribe();

多播 Subject

  • 提供向多个订阅,发送通知的能力
  • subject 本身是观察者, 可以作为Observable 参数
// 创建对象
import { Subject } from 'rx.js';
const subject = new subject();

// 订阅
const A = subject.subscribe(data => console.log('a', data);

const B = subject.subscribe({
    next: data => console.log('b', data),
    error: err => console.error(err),
    complete: () => console.log('end')
    
});

// 发送通知
subject.nect(1);

// 解除订阅
A.unsubscribe();
  • 作为参数
const subject = new Subject();
const observable = Observable.from([1, 2]);

subject.subscribe(data =>{
    console.log(data)
})

// 执行订阅
observable.subscribe(subject);

>>> 1
>>> 2
  • multicast
    • 多播Observable 底层使用该操作符, 实现对多个订阅的通知
    • 通过该操作符,可以控制推送的时机
// 官方例子

// 创建Observable
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 绑定订阅, 此时调用的是 subject.subscribe(), 所以并不会推送通知
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 开始执行, 在底层使用了 `source.subscribe(subject)`:
multicasted.conne
  • 多播变体
  • BehaviorSubject : 缓存当前已发送值
  • ReplaySubject : 记录历史值, 缓存以当前值向前某几位值, 或某段时间前的值
  • AsyncSubject :全体完成后,再发送通知

操作符

  • 声明式的函数调用(FP), 不修改原Observable, 而是返回新的Observable
  • 实例操作符: Observable 实例方法, 例如: multiplyByTen
  • 静态操作符: Observable 类方法 例如: of from interval
  • (操作符分类)[https://cn.rx.js.org/manual/overview.html#h15]

常用操作符

创建

  • of: 发送配置参数
import { of } from 'rxjs';

const data$ = of({id:1}, {id:2});

data$.subscribe(data => console.log(data));

// print
{id:1} ---- {id:2}
  • from: 输出可遍历对象子项
import { from } from 'rxjs';

const data$ = from([1, 2, 3]);
data$.subscribe(data => console.log(data));


// print
1 ---- 2 ---- 3
  • fromEvent: 绑定事件
const ele = document.getElementsById('ele');
const event$ = fromEvent(ele, 'click');

event$.subscribe(event => {
    console.log(evnet.target)
})

// 事件触发时,发出事件流
  • interval: 间隔发送(计时器)
const interval$ = interval(100);
interval$.subscribe( num => console.log(num) )
    
  100ms  100ms  100ms
0 ---- 1 ---- 2 ---- 3 ....
  • timer: 延时发送
// 初次延时
const timer$ = timer(100); 
timer$.subscribe(data => console.log(data));
console.log('run');

// print 
'run'
0

// 延时后,间隔发送
const timer$ = timer(100, 1000); // 100: 延时, 1000: 发送间隔 
timer$.subscribe(data => console.log(data));
console.log('run');

// print 
'run'
// 100ms
0
// 1000ms
1
.....
  • empty: 发送空信息
empty().subscribe(data => console.log(data);

// 无数据输出
  • never: 不发送信息
  • doc
never.subscribe(data => console.log(data));

// 无数据输出
  • trow: 发送错误
throw('This is error').subscribe({
    next: log,
    error: log,
    complete: () => log('complete')
})

// print
'This is error'
  • range: 发送指定队列
// range(star, len) start: 起始值, len: 数据长度
range(2, 2).subscribe(num => console.log(num));

// print
2 ---- 3

转换符

  • map: 函数应用于每个数据源
from([1, 2, 3]).map(item => item++).subscribe(end => console.log(end)

// print
2 ---- 3 ---- 4
  • mapTo: 使用配置指,替换源数据
from([1, 2, 3]).mapTo('value').subscribe(end => console.log(end));

// print
value ---- value ---- value
  • mergeMap: 拍平数据, 返回新Observable
  • doc
// 提取对象内数组数据,并转换为单一数据向外发送
const obj$ = of({arr: [1, 2, 3]});

obj$.pluck('arr')
    .mergeMap(arr => from(arr) ) // 拍平数据
    .subscribe(num => console.log(num));
    
// print

1 ---- 2 ---- 3 
// 这里将数组拆解,作为单一项目向外发送
  • mergeMapTo: 拍平数据, 使用配置指替代源指, 类似 map与mapTo的关系
const obj$ = of({arr: [1, 2, 3]});

obj$.pluck('arr')
    .mergeMapTo('str') // 拍平数据
    .subscribe(num => console.log(num));

// print
str ---- str ---- str
  • pluck;取出指定属性值
const obj$ = of([{nickname: 'coco', age:  12}, {nickname: 'jeck', age: 23} ]);

obj$.pluck(0, 'nickname').subscribe(...);

// print
'coco'
  • concatMap: 合并流,前一个流将作为后一个流的处罚机制
  • doc
const prefix$ = from(['hot', 'remind']);
const next$ = prefix$.concatMap( pre => form(['news', 'info']).map(item => pre +' '+next ) ).subscribe(...)

// print

'hot' ------------------------ 'remind'
'hot news'--- 'hot info' ----- 'remind news' --- 'remind info'

/*
** 后续Observable 可以操作前一个Oberservable发出的数据流,
** 也可以只发送自己的数据留,前一个留只作为触发机制
  • concatMapTo: 类似 map 与 mapTo , 替换源数据值
  • scan: 记录上次回调执行结果
  • doc
// 第一参数为执行回调, 第二参数为初始值
from([1, 2, 3]).scan((a, b) => a+b, 0).subscriba(...)

// print
1 ---- 2 ---- 3
1+0    1+2    3+3
1 ---- 3 ---- 6

// 其他特殊操作
from([1, 2]).scan((a, b) => [...a, b], []);

// print
[1] --- [1, 2]

// 使用数组记录每次发送的值
  • repeat; 重复发送流
  • doc
const num$ = of(1);
num$.repeat(2).subscribe(num => console.log(num) );

// print 
1 ---- 1
  • margeScan: 类似数据流经过scan后在经过 margeMap 处理
// 需要赋初始值,否则结果为NaN, (undefined + number)
form([1, 2]).margeScan( (a, b) => of( a + b), 0 ).subscribe(...)


// print 
1 --- 3 ---- 6

过滤

  • debounceTime: 上游停止发送一段时间后,将最新值发出
from([1, 2, 3]).debounceTime(1000).subscribe(...)

// print
3
  • defultIfEmpty: 上有完成未发出数据,将使用默认值
empty().defultIfEmpty(null).subscribe(...);

// print
null
  • delay 向后延时发送
of(1).delay(1000).subscribe(...)
console.log('after');

// print

'after'
1
  • delayWhen 延时至下游Obersevabel发送数据时,再将数据向下流
of(1).delayWhen( data => interval(1000) ).subscribe(...)

// print
1000ms
------- 1
  • do 不中断流的情况下执行自定义回调
form([1, 2, 3]).do({
 next: num => console.log(num),
 error: error => console.log(error),
 complete: () => console.log('complete')
}).subscribe(...)

// print 
11 ---- 12 ---- 13
  • elementAt 只发送某一次数据
interval(500).elementAt(2).subscribe(...);

// print
2
  • filter 发送符合条件数据
// 过滤偶数
interval(1000).filter( num => num%2 === 0 ).subscribe(...);

// print
0 ---- 2 ---- 4 ....
  • find 发送第一个符合条件数据
interval(1000).find(num => num === 2).subscribe(...);

// print
2
  • findIndx 发送第一个符合条件数据的编号
from([1, 11, 13]).findIndex( num => num > 10 ).subscribe(...);

// print
1
  • first 发送第一个值
interval(100).first().subscribe(...)

// print
0
  • last 发送最后一个指
from([1, 2, 3]).last().subscribe(...)

// print
3
  • every 验证数据每一项都否符合要求, 返回布尔值
range(0, 3).every(num < 3).subscribe(...);

// print
true

// 完成时,返回最终值
  • isEmpty 验证数据是否为空
empty().isEmpty().subscribe(...);

// print
true
  • max 通过比较函数,返回最大值
  • min 通过比较函数, 返回最小值
// 通过自定义函数做判断
from(['coco', 'py', 'nobody']).max((a, b) =>  a.length > b.length ? 1 : -1 ).subscribe(...);


// print
'nobody'
  • take 只发送前n个数据
interval(1000).take(2).subscribe(...)

// print
0 ---- 1
  • takeLast 只发送最后n个数据, 完成后一同发出
range(0, 10).takelast(2).subscribe(...);

// print
9 ---- 10
  • takeUntil 发送数据直到下游Oberservable开始发送数据
interval(500).takeUnitl( of('down').delay(1000) ).subscrivbe(...)

// print
0
  • takeWhile 当条件不满足时终止
interval(100).takeWhile( num => num < 3 ).subscribe(...)

// print
0 --- 1 -- 2
组合
  • switch: 当上游发出数据时,将新开一个下游Obsevable, 并中断前一下游数据流
  • doc
interval(1000).switchMap(pre => interval(300)).subscribe(...);

// print
0 -------------- 1 ----------- 
0 --- 1 --- 2 --- 0 --- 1 --- 2

// 需要注意的是当上游发送频率大于下游时,下游将无法正常发送数据.
  • concat 合并多个不同的流,按先后顺序输出
const a$ = range(0, 3)
const b$ = range(10, 3)

a$.contact(b$).subscribe(...);

// print
0 --- 1 --- 2 --- 10 --- 11 --- 12
  • concat 按顺序执行订阅,只有当一个内部Observable后再执行下一个Observable
range(0, 3)
.do(num => console.log(num)
.map(num => of('next'))
.concatAll()
.subscribe(...)

// print

0 --- next --- 1 --- next --- 2 --- next

/*
** 这里将每个上游值转化为Obervable, 当上游执行完
** 将调用下游值,将数据合并到同一流中
*/
  • merge 合并多个流,拍平数据
const first$ = interva(500).mapTo('first');
const secend$ = interva(500).mapTo('secend');

first$.merge(secend$).subscribe(...)

// print 
'first' --- 'secend' --- 'first' --- 'secend'
  • zip 将多个流数据合并,依次发出
const a$ = range(0, 4);
const b$ = from(['coco', 'jeck', 'mike']);
const c$ = of(true, true, false);

Observable.zip( a$, b$, c$ ).subscribe(...);

// print

[ 0, 'coco', true ] ---- [ 1, 'jeck', true ] ---- [ 2, 'mike', false ]

/*
** 注意;只有当所有子流同次,都有数据发送时,才能获取最终数据 
** 上面例子中 a$ 将多发送一次数据,当最终不会被输出
*/

错误处理

  • catch 捕获错误,返回新的Observable 或 error
  • retry 重试Observable, 达到次数后终止
  • retryWhen

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券