前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >创建 Observable

创建 Observable

作者头像
阿宝哥
发布2019-11-05 16:20:05
1K0
发布2019-11-05 16:20:05
举报
文章被收录于专栏:全栈修仙之路全栈修仙之路

在 RxJS 中为我们提供了很多创建 Observable 对象的方法,其中 create 是最基本的方法。它是 Observable 类的静态属性 —— static create: Function,也是创建 Observable 对象的工厂方法。

import { Observable } from "rxjs";

const observable$ = Observable.create(observer => {
  observer.next('Semlinker');
  observer.next('Lolo');
});
	
observable$.subscribe(value => { // 执行订阅操作	
  console.log(value);
});

以上代码运行后,控制台会依次输出 ‘Semlinker’ 和 ‘Lolo’ 。

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

import { Observable } from "rxjs";

const observable$ = Observable.create(observer => {
   observer.next('Semlinker');
   observer.next('Lolo');
});
	
console.log('start');
observable$.subscribe(function(value) {
  console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end

当然我们也可以用它处理异步行为:

import { Observable } from "rxjs";

const observable$ = Observable.create(observer => {
  observer.next('Semlinker');
  observer.next('Lolo');
		
  setTimeout(() => {
	observer.next('RxJS Observable');
  }, 300);
});
	
console.log('start');
observable$.subscribe(function(value) {
   console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end
RxJS Observable

从以上例子中,我们可以得出一个结论 —— Observable 可以应用于同步和异步的场合。

Observer

Observer(观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。

Observer 接口定义

interface Observer<T> {
  closed?: boolean; // 标识是否已经取消对Observable对象的订阅
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

Observer 中的三个方法的作用:

  • next —— 每当 Observable 发送新值的时候,next 方法会被调用。
  • error —— 当 Observable 内发生错误时,error 方法就会被调用。
  • complete —— 当 Observable 数据终止后,complete 方法会被调用。在调用 complete 方法之后,next 方法就不会再次被调用。

接下来我们来看个具体示例:

import { Observable } from "rxjs";

const observable$ = Observable.create(observer => {
	 observer.next('Semlinker');
	 observer.next('Lolo');
	 observer.complete();
	 observer.next('not work');
});
	
// 创建一个观察者
const observer = {
  next: function(value) {
	console.log(value);
  },
  error: function(error) {
	console.log(error);
  },
  complete: function() {
	console.log('complete');
  }
}

// 订阅已创建的observable对象
observable$.subscribe(observer);

以上代码运行后,控制台的输出结果:

Semlinker
Lolo
complete

上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work

另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:

var observer = {
	next: function(value) {
		console.log(value);
	}
};

有时候 Observable 可能是一个无限的序列,比如监听 click 事件,对于这种场景,complete 方法就永远不会被调用。

我们也可以在调用 Observable 对象的 subscribe 方法时,依次传入 next、error、complete 三个函数,来创建观察者:

observable.subscribe(
    value => { console.log(value); },
    error => { console.log('Error: ', error); },
    () => { console.log('complete'); }
);

Subscription

有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription 对象的 unsubscribe 方法来释放资源。具体示例如下:

import { timer } from "rxjs";

const source$ = timer(1000, 1000);

// 取得subscription对象
const subscription = source$.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});

setTimeout(() => {
    subscription.unsubscribe();
}, 5000);

常见 creation 操作符

除了上面介绍的 create 方法之外,RxJS 还提供了很多操作符,用于创建 Observable 对象,比如:

  • of
  • from
  • range
  • empty
  • throwError
  • fromEvent
  • interval
  • timer
of
import { of } from "rxjs";

const source$ = of('Semlinker', 'Lolo');

source$.subscribe({
    next: function(value) {
      console.log(value);
    },
    complete: function() {
      console.log('complete!');
    },
    error: function(error) {
      console.log(error);
    }
});

以上代码运行后,控制台的输出结果:

Semlinker
Lolo
complete!
from
  1. 数据源为数组
import { from } from "rxjs";

const source$ = from([1, 2, 3]); // 也支持字符串,比如"Angular"

source$.subscribe({
  next: function(value) {
    console.log(value);
  },
  complete: function() {
    console.log("complete!");
  },
  error: function(error) {
    console.log(error);
  }
});

以上代码运行后,控制台的输出结果:

1
2
3
complete!
  1. 数据源为 Promise 对象
import { from } from "rxjs";

const promiseSource$ = from(new Promise(resolve => resolve("Hello World!")));

promiseSource$.subscribe({
  next: function(value) {
    console.log(value);
  },
  complete: function() {
    console.log("complete!");
  },
  error: function(error) {
    console.log(error);
  }
});

以上代码运行后,控制台的输出结果:

Hello World!
complete!
range
import { range } from "rxjs";

const source$ = range(1, 5);
const example = source$.subscribe(val => console.log(val));

以上代码运行后,控制台的输出结果:

1
2
3
4
5
empty

empty就是产生一个直接完结的Observable对象,没有参数,不产生任何数据,直接完结。

import { empty } from "rxjs";

const subscribe = empty().subscribe({
  next: () => console.log("Next"),
  complete: () => console.log("Complete!")
});

以上代码运行后,控制台的输出结果:

Complete!
throwError
import { throwError } from "rxjs";

const source$ = throwError("This is an error!");
source$.subscribe({
  next: val => console.log(val),
  complete: () => console.log("Complete!"),
  error: val => console.log(`Error: ${val}`)
});

以上代码运行后,控制台的输出结果:

Error: This is an error! # throwError 只做一件事就是抛出异常。
fromEvent
import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";

const source$ = fromEvent(document, "click");
const example$ = source$.pipe(map(event => `Event time: ${event.timeStamp}`));
const subscribe = example$.subscribe(val => console.log(val));
interval
import { interval } from "rxjs";

const source$ = interval(1000);
source$.subscribe(val => console.log(val));

以上代码运行后,控制台的输出结果:

0
1
2
...

interval 支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。

timer
import { timer } from "rxjs";

const source$ = timer(1000, 5000);
const subscribe = source$.subscribe(val => console.log(val));

以上代码运行后,控制台的输出结果:

0 # 1s后
1 # 5s后
2 # 5s后
...

timer 支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 也可以只传一个参数,比如:

import { timer } from "rxjs";

const source$ = timer(1000);
source$.subscribe(
  val => console.log(val),
  () => console.error("error!"),
  () => console.log("complete!")
);

以上代码运行后,控制台的输出结果:

0
complete!

参考资源

  • 30 天精通RxJS (05): 建立Observable(一)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018/09/15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Observer
  • Subscription
  • 常见 creation 操作符
    • of
      • from
        • range
          • empty
            • throwError
              • fromEvent
                • interval
                  • timer
                  • 参考资源
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档