前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJS 入门到搬砖 之 Observable 和 Observer

RxJS 入门到搬砖 之 Observable 和 Observer

作者头像
Cellinlab
发布2023-05-17 20:34:05
6900
发布2023-05-17 20:34:05
举报
文章被收录于专栏:Cellinlab's BlogCellinlab's Blog

# Observable

Observable 是多个值的惰性 Push 集合。他填补了下表中的缺失点:

SINGLE

MULTIPLEXED

Pull

Function

Iterator

Push

Promise

Observable

如,下面是一个 Observable,它在订阅时立即(同步)推送值 123,并且从 subscribe 调用开始后过 1 s 再推送值 4,然后结束。

代码语言:javascript
复制
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

要调用 Observable 并查看这些值,我们需要订阅它:

代码语言:javascript
复制
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

console.log('Before subscribe');
observable.subscribe({
  next(x) { console.log('Next: ' + x); },
  error(err) { console.error('Error: ' + err); },
  complete() { console.log('Complete'); }
});
console.log('After subscribe');

// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete 

# Pull vs Push

PullPush 是两种不同的协议,描述了数据生产者和数据消费者如何进通信。

什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。

每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull 出单个返回值来使用它。

ES 2015 中介绍了生成器函数和迭代器 (opens new window)function *),也属于 Pull 系统。调用 iterator.next() 的代码是消费者,从迭代器(生产者)中拉出多个值。

PRODUCER

CONSUMER

Pull

Passive:produces data when requested

Active:decides when data is requested

Push

Active:produces data at its own pace

Passive:reacts to received data

什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。

Promise 是目前 JavaScript 中最常见的 Push 系统类型。Promise (生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。

RxJS 引入了 Observable,一个新的 JavaScript Push 系统。Observable 是一个多值生产者,推送数据给 Observer(消费者)。

  • 函数是一种惰性求值计算,在调用时同步返回单个的值。
  • 生成器是一种惰性求值计算,在迭代时同步返回 0 个或到可能无限多个值。
  • Promise是一种可能(或可能不会)最终返回单个值的计算。
  • Observable是一种惰性求值计算,从调用时起可以同步或异步地返回 0 个或到可能无限多个值。

# Observables as generalizations of functions

Observable 不像 EventEmitter 也不像 Promise 用于多个值。在一些情况下 Observable 会表现地像 EventEmitter,如当使用 RxJS 的 Subject 进行多播时,但通常它们的行为不像 EventEmitter

Observable 类似于零参数的函数,但将它们泛化为允许多个值。

代码语言:javascript
复制
function foo () {
  console.log('Hello');
  return 42;
}

const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42

const y = foo.call(); // same as foo()
console.log(y); 
// Hello
// 42

使用 Observable 改写上面的代码:

代码语言:javascript
复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
});

foo.subscribe(x => {
  console.log(x);
});
// Hello
// 42

foo.subscribe(y => {
  console.log(y);
});
// Hello
// 42

因为 函数 和 Observable 都是惰性计算。如果你不调用函数,console.log('Hello') 就不会被执行。同样对于 Observable,如果你不“调用”它(使用 subscribe), console.log('Hello') 也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。和 EventEmitter 共享副作用并且无论订阅者是否存在都立即触发相反,Observable 没有共享执行并且是惰性计算。

订阅一个 Observable 就是调用一个函数。

部分人觉得 Observable 是异步的,这并不是真的。

代码语言:javascript
复制
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after

使用 Observable 会观察到和函数一样的输出:

代码语言:javascript
复制
console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after

这说明,对 foo 的订阅完全是同步的,就像一个函数一样。

Observable 既能同步也可以异步地传递值。

Observable 和函数之间的区别是什么?Observable 可以随着时间推移“返回”多个值,这是函数无法做到的。

代码语言:javascript
复制
function foo () {
  console.log('Hello');
  return 42;
  return 100; // dead code, will never happen
}

函数只能返回一个值,而 Observable 可以返回多个值:

代码语言:javascript
复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
});

console.log('Before');
foo.subscribe(x => {
  console.log(x);
});
console.log('After');

// Before
// Hello
// 42
// 100
// 200
// After

也可以异步地返回值:

代码语言:javascript
复制
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300);
  }, 1000);
});

console.log('Before');
foo.subscribe(x => {
  console.log(x);
});
console.log('After');

// Before
// Hello
// 42
// 100
// 200
// After
// 300

结论:

  • func.call() 表示同步地返回一个值
  • observable.subscribe() 表示同步或异步地返回 0 或多个值

# Anatomy of an Observable

Observable 使用 new Observable 或一个创建操作符来 created,会被 Observer subscribedexecute 来向 Observer 传递 next / error / complete 通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable 实例中,当其中一些与其他类型相关,如 ObserverSubscription

Observable 核心关注点:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables
# Creating Observables

Observable 构造函数接受一个参数:subscribe 函数

代码语言:javascript
复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
});

Observable 可以使用 new Observable 来创建。通常,Observable 使用创建函数如 offrominterval 等来创建。

# Subscribing to Observables
代码语言:javascript
复制
observable.subscribe(x => {
  console.log(x);
});

这不是个巧合,observable.subscribenew Observable(function subscribe(subscriber) {})subscribe 有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。

这表示订阅调用不会在同一个 Observable 的多个 Observer 之间共享。当使用 Observer 调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {}) 中的 subscribe 函数为给定的 subscriber 运行。对 observable.subscribe 的每次调用都会为给定的 subscriber 触发其对应的设置。

对于 Observable 的订阅就像调用一个函数,提供了可以传递数据的回调。

这和 addEventListener / removeEventListener 等事件处理程序 API 完全不同。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护一个 Observer 列表。

订阅调用只是一种启动 Observable 执行并将值或时间传递给该执行的 Observer 的方法。

# Executing Observables

new Observable(function subscribe(subscriber) {}) 里面的代码表示 Observable 的执行,只发生在每个订阅的 Observer 上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。

Observable 执行可以传递的值类型:

  • Next 通知:发送一个值,如 NumberStringObject
  • Error 通知:发送一个错误,如 Error
  • Complete 通知:不发送值

Next 通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。ErrorComplete 通知在 Observable 执行过程中只可能执行一次,并且只能有一个发生。

代码语言:javascript
复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 严格遵守协议,在 Complete 通知之后的 Next 通知将不会被发送:

代码语言:javascript
复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Is not delivered to subscribers
});

可以在 subscribe 代码外包一层 try/catch 块,以捕获错误:

代码语言:javascript
复制
import {Observable} from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err);
  }
});

# Disposing Observables Executions

因为 Observable 执行可能是无限的,但是对于 Observer 来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer,一旦 Observer 接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。

observable.subscribe 被调用时,Observer 被附加到新创建的 Observable 执行中,该调用还会返回 Subscription 对象。

代码语言:javascript
复制
const subscription = observable.subscribe(x => console.log(x));

Subscription (opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。

代码语言:javascript
复制
import { from } from 'rxjs';

const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));

// Later
subscription.unsubscribe();

当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源,如可以在函数 subscribe() 中返回自定义取消订阅函数来实现。

代码语言:javascript
复制
const observable = new Observable(function subscribe (subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next(Math.random());
  }, 1000);

  return function unsubscribe () {
    clearInterval(intervalId);
  };
});

就像 observable.subscribe 类似于 new Observable(function subscribe (subscriber) {}), 我们从 subscribe 返回的 unsubscribe 在概念上等同于 subscription.unsubscribe。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。

代码语言:javascript
复制
function subscribe (subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next(Math.random());
  }, 1000);

  return function unsubscribe () {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({
  next: x => console.log(x),
  error: err => console.error(err),
  complete: () => console.log('completed')
});

// Later
unsubscribe();

之所以使用像 ObservableObserverSubscription 的 Rx 类型,是为了安全考虑和 Operator 的可组合性。

# Observer

什么是 Observer Observer 作为消费者消费 Observable 派发的值。Observer 只是一组回调,用于 Observable 派发的每种类型的通知:next, errorcomplete

代码语言:javascript
复制
const observer = {
  next: value => console.log(`Observer got a next value: ${value}`),
  error: error => console.error(`Observer got an error: ${error}`),
  complete: () => console.log('Observer got a complete notification')
};

// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);

Observer 只是有三个回调的对象,用于 Observable 可能派发每种类型的通知。

RxJS 中的 Observer 也可能是部分的。如果没有提供某种回调,Observable 也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer 中找不到对应的回调。

代码语言:javascript
复制
const observer = {
  next: value => console.log(`Observer got a next value: ${value}`),
  error: error => console.error(`Observer got an error: ${error}`)
};

在订阅 Observable 时,也可以不用将回调放在一个 Observer 对象中,只传一个 next 回调函数作为参数就可以。

代码语言:javascript
复制
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));

observable.subscribe 内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer 对象。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/6/27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # Observable
    • # Pull vs Push
      • # Observables as generalizations of functions
        • # Anatomy of an Observable
          • # Creating Observables
          • # Subscribing to Observables
          • # Executing Observables
          • # Disposing Observables Executions
      • # Observer
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档