一本书里面内容较多, 因此分成了多篇 Post, 可以从此处看到相关文章:
首先回顾一下 JS 原生的一些特性:
然后是 NodeJS 的一些实现
额外需要注意的是: JS 原生是单线程, 但是一些 underlying platform (Browser and NodeJS).
一个简单的 Promise
const someFutureValue = new Promise((resolve, reject) => {
const value = doSomething()
if (value === null) {
reject(new Error("Ooops!"));
}
resolve(value);
});
someFutureValue.then(doSomethingElseWithThatValue);
Promise 和上方的一个 ADT: Validation 很是相似, 可以看到 Promise 有一些 ADT 的特性
Promise.resolve('aa').then(identity);
and Promise.resolve('aa');
两者相同, 都最终生成 Promise { 'aa' }
Promise.resolve("aabbcc").then(unique).then(join).then(toUpper);
Promise.resolve("aabbcc").then(compose(toUpper, join, unique));
同上两者相同, 都得到 Promise { 'ABC' }
Promise.then()
和 Promise.catch()
其实也就和 Validation 里面的 Success 和 Failure 里面的逻辑类似Promise 没有完全符合 ADT 的 Fantasy Land Specification (Chapter 5), 但是确实和 ADT 的行为很相似
假设有 A,B,C 三个 Promise 需要按照顺序依次执行.
比较传统的使用 for 和 reduce 来进行处理的方法:
const sleep = ms => {
return new Promise(resolve => setTimeout(resolve, ms))
}
for(let i = 0;i <= 10;i++){
await sleep(1000);
console.log(new Date(), i);
}
[delay("a", 500), delay("b", 100), delay("c", 200)].reduce(
(chain, next) => chain.then(() => next).then(::console.log),
Promise.resolve()
);
MDN: for await……of
使用 for await……of
来处理的方法:
Note:
for await……of
doesn't work with async iterators that are not async iterables.
var txt = "";
const array = [1,2,3,4,5];
async function test() {
for await (const p of array) {
var txt = p + "</br>";
document.write(txt);
}
}
test();
需要注意的是 for await……of
需要一个对象拥有一个 function-valued symbol property Symbol.asyncIterator
, 因此可以如此设计一个对象用于 for await……of
const LIMIT = 3;
const asyncIterable = {
[Symbol.asyncIterator]() { /* 必须要拥有这个属性才能使用 for await……of */
let i = 0;
return {
next() {
const done = i === LIMIT;
const value = done ? undefined : i++;
return Promise.resolve({ value, done });
},
return() {
// This will be reached if the consumer called 'break' or 'return' early in the loop.
return { done: true };
}
};
}
};
(async () => {
for await (const num of asyncIterable) {
console.log(num);
}
})();
// 0
// 1
// 2
var tasks = [
["a", 500],
["b", 100],
["c", 200],
];
function delay(value, time) {
return new Promise((resolve) => {
console.log(value);
setTimeout(resolve, time, value);
});
}
function delayedIterator(tasks) {
return {
async next() {
/* 每一次从任务中取出最顶上一个, 需要注意的是这里 task 会被缓存。直到全部取完之返回一个 done:true */
if (tasks.length) {
const [value, time] = tasks.shift();
return await delay(value, time);
}
return Promise.resolve({ done: true });
},
};
}
var it = delayedIterator(tasks);
/* 多次执行之后直到最后一次会得到 done */
await it.next().then(({ value, done }) => { value; done; }); /* a */
await it.next().then(({ value, done }) => { value; done; }); /* b */
await it.next().then(({ value, done }) => { value; done; }); /* c */
await it.next().then(({ value, done }) => { value; done; }); /* done */
以及一个更简洁简洁的例子:
var tasks = [ ["a", 500], ["b", 100], ["c", 200], ];
function delay(value, time) {
return new Promise((resolve) => {
// console.log(value);
setTimeout(resolve, time, value);
});
}
const delayedIterator = (task) => () => ({
async next() {
if (tasks.length) {
const [value, time] = tasks.shift();
await delay(value, time);
return { done: false, value };
}
return Promise.resolve({ done: true });
},
});
var delayedIterable = { [Symbol.asyncIterator]: delayedIterator(tasks) };
for await (const value of delayedIterable) {
console.log(value);
}
上一节正好有一个例子可以参考
关于 Iterables
和 Iterator Protocol
:
Symbol.iterator
元素来决定遍历的细节.Symbol.iterator
可以是一个简单的函数或者是一个 Generator对于 Iterator
需要满足一些协议:
done: boolean
和 value: any
class Block {
index = 0;
constructor(index, previousHash, data = [], difficulty = 0) {
this.index = index;
this.previousHash = previousHash;
this.data = data;
this.nonce = 0;
this.difficulty = difficulty;
this.timestamp = Date.now();
this.hash = this.calculateHash();
}
//……
[Symbol.iterator]() {
return this.data[Symbol.iterator]();
}
}
for (const transaction of block) {
// do something with transaction
}
[Symbol.iterator]
其实是个普遍存在的属性, Array/Map/Set 以及 String 都有这个属性
"Joy of JavaScript"[Symbol.iterator]; // [Function: [Symbol.iterator]]
for(const letter of "Joy of JavaScript") {
console.log(letter);
}
需要注意的是:
Generator
是一个函数, 并且不能用 Arrow Function 来定义. (也许未来版本 ES 会支持)
function* sayIt() {
yield "The";
yield "Joy";
yield "of";
yield "JavaScript!";
}
const it = sayIt();
it.next(); // { value: 'The', done: false }
it.next(); // { value: 'Joy', done: false }
/* …… */
// 当然也可以使用 `for` 循环来遍历
/*
const it = sayIt();
for(const message of sayIt()) {
console.log(message);
}
*/
在 Class 中的形式:
class SomeClass {
*sayIt() {
return "The Joy of JavaScript!";
}
}
先看一个简单版的例子:
const obj = {
*[Symbol.iterator]() {
yield 1
yield 2
yield 3
}
}
/* 其实这里使用 destructuring assignment 执行了三次 yield */
const [a, b, c] = obj
console.log(a,b,c); // 1,2,3
其核心在于 yield 了多次, 使用 destructuring assignment 一次性将值全部取出来, 下面还有一个稍微复杂一点的例子:
class Validation {
#val;
/* …… */
*[Symbol.iterator]() {
yield this.isFailure ? Failure.of(this.#val) : undefined;
yield this.isSuccess ? Success.of(this.#val) : undefined;
}
}
/* 第一个元素和 Success 的情况无关可以忽略 */
const [, right] = Success.of(2);
right.isSuccess; // true
right.get(); // 2
/* 第二个元素 right 和 Failure 的情况无关也可以忽略 */
const [left, /* right */] = Failure.of(new Error("Error occurred!"));
left.isFailure; // true
left.getOrElse(5); // 5
for await……of
来遍历async function* generateBlocksFromFile(file) { /* 'async function*' 定义一个异步迭代器 */
try {
const dataStream = fs.read(file);
let previousDecodedData = "";
for await (const chunk of dataStream) {
previousDecodedData += chunk;
let separatorIndex;
while ((separatorIndex = previousDecodedData.indexOf(";")) >= 0) {
const decodedData = previousDecodedData.slice(0, separatorIndex + 1);
const blocks = tokenize(";", decodedData)
.filter((str) => str.length > 0)
.map((str) => str.replace(";", ""));
for (const block of blocks) {
/* 对每个 chunk 进行 yeild */
yield JSON.parse(block);
}
}
}
if (previousDecodedData.length > 0) {
/* 对多种情况进行 yeild */
yield JSON.parse(previousDecodedData);
}
} catch (e) {
console.error(`Error processing file: ${e.message}`);
throw e;
}
}
let result = 0;
/* 然后通过循环就可以逐步执行 Promise */
for await (const block of generateBlocksFromFile("blocks.txt")) {
console.log("Counting block", block.hash);
result++;
previousDecodedData = previousDecodedData.slice(separatorIndex + 1);
}
result; // 3, 最终进行了 3 个部分的 await
EventEmitter 需要通过 Node.js 的
events
模块来使用
const EventEmitter = require('events');
const myEmitter = new EventEmitter();
myEmitter.on("some_event", () => {
console.log("An event occurred!");
});
myEmitter.emit("some_event");
const {EventEmitter} = require('events');
class PushArray extends Array {
static EVENT_NAME = "new_value";
#eventEmitter = new EventEmitter();
constructor(……values) {
super(……values);
}
push(value) {
this.#eventEmitter.emit(PushArray.EVENT_NAME, value);
return super.push(value);
}
subscribe({ next }) {
this.#eventEmitter.on(PushArray.EVENT_NAME, (value) => {
next(value);
});
}
unsubscribe() {
this.#eventEmitter.removeAllListeners(PushArray.EVENT_NAME);
}
}
const pushArray = new PushArray(1, 2, 3);
pushArray.subscribe({
next(value) {
console.log("New value:", value); // do something with value
},
});
pushArray.push(4);
pushArray.push(5);
pushArray.unsubscribe();
pushArray.push(6);
An Observable object is designed to model a lazy, unidirectional, push-based data source (such as streams).
Observable 暂时没有 JS 的原生实现, 一般通过
RxJS
或者core-js
进行使用, 它具有一些特性:
一个 Observable 的大体模板:
const Observer = {
next(event) {
// Receives each event in the stream
},
error(e) {
// Triggered when an exception occurs somewhere along the observable
},
complete() {
// Called when there are no more values to emit; not called on error
},
};
需要提前安装
core-js
:npm install core-js -S
import 'core-js/features/observable/index.js'
function newRandom(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
}
/* 定义一个 Observable */
const randomNum$ = new Observable((observer) => {
const _id = setInterval(() => {
observer.next(newRandom(1, 10));
}, 1_000);
return () => {
clearInterval(_id);
};
});
const subs = randomNum$.subscribe({
next(number) {
console.log("New random number:", number);
},
complete() {
console.log("Stream ended");
},
});
// some time later……
subs.unsubscribe();
下面来实现针对 Observable 的一些方法
import 'core-js/features/observable/index.js'
/**
* @name curry
* @see https://github.com/JoyOfJavaScript/joj/blob/7f231147029df787bd9eb510f42354adc7461ac0/src/blockchain/src/util/fp/combinators.js
*/
export const curry = fn => (……args1) =>
args1.length === fn.length
? fn(……args1)
: (……args2) => {
const args = [……args1, ……args2]
return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
}
/**
* @name map
* @description 给 Observable 添加一个 map 的方法
* @param fn {Function} map handler
* @param stream {object (Observable)} 接受一个 Observable
*/
const map = curry( /* 使用 curry 以方便后期的 composition */
(fn, stream) =>
/* 返回一个 New Observable */
new Observable(observer => {
/* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
next(value) {
try {
observer.next(fn(value));
}
catch (err) {
observer.error(err);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
}
});
return () => subs.unsubscribe(); /* 返回一个 用于 unSubscribe 的匿名 function */
})
);
map()
返回了一个对 Observable(stream)
进行 subscribe 的另一个新的 Observable(New Observable)
对象const square = num => num ** 2;
map(
square,
Observable.of(1, 2, 3, 4)
/* map 会返回一个 New Observable, subscribe 这个 New Observable 就可以 subscribe stream 指向的 Observable */
).subscribe({
next(number) {
console.log(number);
},
complete() {
console.log('Stream ended');
}
});
// Prints 1 4 9 16
同样的最终的目的是使得代码可以连续使用:
const square = num => num ** 2;
const add = curry((x, y) => x + y);
const subs = map(square, map(add(1), Observable.of(1, 2, 3)))
.subscribe({
next(number) {
console.log(number);
},
complete() {
console.log('Stream ended');
}
})
// Prints: 4 9 16
如此创造了一个 dataStream:
Observable.of()
得到数据: 1,2,3add(1)
处理后得到数据: 2,3,4square
处理后得到数据, 4,9,16const filter = curry(
(predicate, stream) =>
new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (predicate(value)) {
/* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
})
);
const reduce = curry((accumulator, initialValue, stream) => {
let result = initialValue ?? {};
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
result = accumulator(result, value);
},
error(e) {
observer.error(e);
},
complete() {
observer.next(result);
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
const skip = curry((count, stream) => {
let skipped = 0;
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (skipped++ >= count) {
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
import 'core-js/features/observable/index.js'
const square = num => num ** 2;
const isEven = num => num % 2 === 0;
export const curry = fn => (……args1) =>
args1.length === fn.length
? fn(……args1)
: (……args2) => {
const args = [……args1, ……args2]
return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
}
const filter = curry(
(predicate, stream) =>
new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (predicate(value)) {
/* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
})
);
const add = curry((x, y) => x + y);
const skip = curry((count, stream) => {
let skipped = 0;
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (skipped++ >= count) {
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
const map = curry( /* 使用 curry 以方便后期的 composition */
(fn, stream) =>
/* 返回一个 New Observable */
new Observable(observer => {
/* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
next(value) {
try {
observer.next(fn(value));
}
catch (err) {
observer.error(err);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
}
});
return () => subs.unsubscribe(); /* 返回一个 用于 unSubscribe 的匿名 function */
})
);
const reduce = curry((accumulator, initialValue, stream) => {
let result = initialValue ?? {};
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
result = accumulator(result, value);
},
error(e) {
observer.error(e);
},
complete() {
observer.next(result);
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
const obs = Observable.of(1, 2, 3, 4);
reduce( /* Step4: reduce with add, 最终得到一个 Observable 里面数据包含 20 */
add,
0,
map(
square, /* Step3: 平方, 得到 4, 16 */
filter(
isEven, /* Step2: 仅仅筛选双数, 保留 2,4 */
skip(1, obs) /* Step1: 跳过第一个, 返回一个 Observable 里面包含数据 2,3,4 */
)
)
)
.subscribe({
next(value) {
console.log(value);
},
complete() {
// done();
}
});
上方的语法还是有一些难懂, 我们希望可以使用 chained statement 来进行简化. 这个时候就可以设计一个 Mixin:
export const ReactiveExtensions = {
/* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
filter(predicate) {
return filter(predicate, this);
},
map(fn) {
return map(fn, this);
},
skip(count) {
return skip(count, this);
},
reduce(accumulator, initialValue = {}) {
return reduce(accumulator, initialValue, this);
},
};
/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);
/* Step1: 生成一个 Observable */
Observable.of(1, 2, 3, 4)
.skip(1) /* Step2: 调用 Observable 的 skip 方法, 这个方法会返回一个 New Observable 并且在 New Observable 里面对当前调用的这个 Observable 进行 subscribe */
.filter(isEven) /* Step3: 同上, 新建第二个 New Observable 并且对第一个 Observable 进行 subscriber */
.map(square) /* Step4: 新建第三个 New Obs 并且对 第二个进行 subscribe */
.reduce(add, 0) /* 如此循环最终返回一个 Obs, 并且最后进行 subscribe */
.subscribe({
next(value) {
console.log(value);
},
});
上方都是使用 Observable.of()
来创造一个 Observable, 我们可以扩展一个 Observable.from(generator)
的方法通过 Generator 来构造一个 Observable
import 'core-js/features/observable/index.js'
/* node 16 需要额外的库来支持 readable 方法 */
import {Readable} from 'readable-stream'
Object.defineProperty(Observable, 'fromGenerator', {
value(generator) {
return new Observable(observer => {
Readable.from(generator)
.on('data', (x) => observer.next(x))
.on('end', (x) => observer.complete(x));
});
},
enumerable: false,
writable: false,
configurable: false
});
/* 另外也支持 async 的 generator (async) */
function* words() {
yield 'The';
yield 'Joy';
yield 'of';
yield 'JavaScript';
}
Observable.fromGenerator(words())
.subscribe({
next: (x)=>console.log(x),
})
let validBlocks = 0;
const chain = new Blockchain();
let skippedGenesis = false;
/* Step1: 读取一个文件 */
for await (const blockData of generateBlocksFromFile("blocks.txt")) {
/* Step2: 跳过一些内容 */
if (!skippedGenesis) {
skippedGenesis = true;
continue;
}
/* 对 Block 进行一些操作 */
const block = new Block(
blockData.index,
chain.top.hash,
blockData.data,
blockData.difficulty
);
chain.push(block);
/* validate */
if (block.validate().isFailure) {
continue;
}
validBlocks++;
}
const chain = new Blockchain();
// 将一些方法抽取出来作为 helper functions
const validateBlock = (block) => block.validate();
const isSuccess = (validation) => validation.isSuccess;
const boolToInt = (bool) => (bool ? 1 : 0);
const addBlockToChain = curry((chain, blockData) => {
const block = new Block(
blockData.index,
chain.top.hash,
blockData.data,
blockData.difficulty
);
return chain.push(block);
});
// Main Logic: 可见业务逻辑明显变得更简单
Observable.fromGenerator(generateBlocksFromFile("blocks.txt"))
.skip(1)
.map(addBlockToChain(chain))
.map(validateBlock)
.filter(prop("isSuccess"))
.map(compose(boolToInt, isSuccess))
.reduce(add, 0)
.subscribe({
next(validBlocks) {
if (validBlocks === chain.height() - 1) {
console.log("All blocks are valid!");
} else {
console.log("Detected validation error in blocks.txt");
}
},
error(error) {
console.error(error.message);
},
complete() {
console.log("Done validating all blocks!");
},
});
这一节通过设置 Object 的
Symbol.observable
属性以实现返回一个 Observable
import 'core-js/features/observable/index.js'
const Pair = (left, right) => ({
left,
right,
[Symbol.observable]() {
return new Observable(observer => {
observer.next(left);
observer.next(right);
observer.complete();
});
}
});
Observable.from(Pair(20, 30))
.subscribe({
next(value) {
console.log('Pair element: ', value);
}
});
class Blockchain {
blocks = new Map();
blockPushEmitter = new EventEmitter();
constructor(genesis = createGenesisBlock()) {
this.top = genesis;
this.blocks.set(genesis.hash, genesis);
this.timestamp = Date.now();
this.pendingTransactions = [];
}
push(newBlock) {
newBlock.blockchain = this;
this.blocks.set(newBlock.hash, newBlock);
this.blockPushEmitter.emit(EVENT_NAME, newBlock);
this.top = newBlock;
return this.top;
}
//……
/* 返回一个 Obs */
[Symbol.observable]() {
return new Observable(observer => {
for (const block of this) {
observer.next(block);
}
this.blockPushEmitter.on(EVENT_NAME, block => {
console.log('Emitting a new block: ', block.hash);
observer.next(block);
});
});
}
}
const chain = new Blockchain();
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
const subs = Observable.from(chain)
.subscribe({
next(block) {
console.log('Received block: ', block.hash);
if (block.validate().isSuccess) {
console.log('Block is valid');
}
else {
console.log('Block is invalid');
}
}
});
// …… later in time
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
subs.unsubscribe();
chain.height(); // 4
Outputs:
Received block b81e08daa89a92cc4edd995fe704fe2c5e16205eff2fc470d7ace8a1372e7de4
Block is valid
Received block 352f29c2d159437621ab37658c0624e6a7b1aed30ca3e17848bc9be1de036cfd
Block is valid
Received block 93ff8219d77be5110fa61978c0b5f77c6c8ece96dd3bba2dc6c3c4b731a724e7
Block is valid
Emitting a new block:
07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Received block 07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Block is valid
另外如果 push 一个 invalid 的 Block :
chain.push(new Block(-1, chain.top.hash, []))
会得到以下输出:
Emitting a new block:
c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Received block c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Block is invalid
这里是一个完整的将 Observable 进行动态数据流转化的例子.
obUtils.js
: 这个文件里面都是上文提到过的一些 util 方法
export const curry = fn => (……args1) =>
args1.length === fn.length
? fn(……args1)
: (……args2) => {
const args = [……args1, ……args2]
return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
}
export const filter = curry(
(predicate, stream) =>
new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (predicate(value)) {
/* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
})
);
export const skip = curry((count, stream) => {
let skipped = 0;
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
if (skipped++ >= count) {
observer.next(value);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
export const map = curry( /* 使用 curry 以方便后期的 composition */
(fn, stream) =>
/* 返回一个 New Observable */
new Observable(observer => {
/* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
next(value) {
try {
observer.next(fn(value));
}
catch (err) {
observer.error(err);
}
},
error(e) {
observer.error(e);
},
complete() {
observer.complete();
}
});
return () => subs.unsubscribe(); /* 返回一个 用于 unSubscribe 的匿名 function */
})
);
export const reduce = curry((accumulator, initialValue, stream) => {
let result = initialValue ?? {};
return new Observable((observer) => {
const subs = stream.subscribe({
next(value) {
result = accumulator(result, value);
},
error(e) {
observer.error(e);
},
complete() {
observer.next(result);
observer.complete();
},
});
return () => subs.unsubscribe();
});
});
export const ReactiveExtensions = {
/* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
filter(predicate) {
return filter(predicate, this);
},
map(fn) {
return map(fn, this);
},
skip(count) {
return skip(count, this);
},
reduce(accumulator, initialValue = {}) {
return reduce(accumulator, initialValue, this);
},
};
ob.js
import EventEmitter from 'events'
import 'core-js/features/observable/index.js'
import { ReactiveExtensions,curry } from'./obUtil.js'
/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);
const square = num => {
console.log('square', num);
return num ** 2
};
const isEven = num => {
console.log('isEven', num);
return num % 2 === 0
};
const ON_EVENT = "on"; /* 自定义的事件名称 */
const END_EVENT = "end";
const LOG_LABEL = `IN-STREAM`;
const LOG_LABEL_INNER = `${LOG_LABEL}:push`;
function implementsPush(obj) {
return obj
&& Symbol.iterator in Object(obj)
&& typeof obj['push'] === 'function'
&& typeof obj[Symbol.iterator] === 'function';
}
const reactivize = (obj) => {
implementsPush(obj) ||
new TypeError("Object does not implement a push protocol");
const emitter = new EventEmitter();
/*
变量 pushProxy 会被 Object.assign 扩展到返回值里面
实际上这里有用的就是一个 Proxy 的细节
因此变量名称不重要
*/
const pushProxy = new Proxy(obj, {
get(……args) {
/* 使用 spread operator 获取所有参数 */
const [target, key] =
args; /* 第一个参数是 target obj, 第二个参数是对应的 key */
if (key === "push") { /* Step6: 通过 Proxy 监听了 push 事件 */
const methodRef = target[key];
return (……capturedArgs) => {
const result = methodRef.call(
target,
……[capturedArgs]
); /* 实际执行一次 Push */
emitter.emit(ON_EVENT, ……capturedArgs); /* Step7: 此处调用一次 ON_EVENT */
return result;
};
}
return Reflect.get(……args);
},
});
/*
变量 observable 会被 Object.assign 扩展到返回值里面
实际上这里有用的就是 [Symbol.observable] 这个属性
因此变量名称也不重要
*/
const observable = {
[Symbol.observable]() {
return new Observable((observer) => {
console.group(LOG_LABEL);
/* Step8: 监听 ON_EVENT, 并进行处理, 核心在于 observer.next() */
emitter.on(ON_EVENT, (newValue) => {
console.group(LOG_LABEL_INNER);
console.log("Emitting new value: ", newValue);
observer.next(newValue);
console.groupEnd(LOG_LABEL_INNER);
});
emitter.on(END_EVENT, () => {
observer.complete();
});
for (const value of obj) {
observer.next(value);
}
return () => {
console.groupEnd(LOG_LABEL);
emitter.removeAllListeners(ON_EVENT, END_EVENT);
};
});
},
};
return Object.assign(pushProxy, observable);
};
let count = 0;
const arr$ = reactivize([1, 2, 3, 4, 5]);
const subs = Observable.from(arr$) /* Step1: 得到一个 Obs */
.filter(isEven) /* Step2: 筛选偶数 */
.map(square) /* Step3: 平方 */
.subscribe({
next(value) {
/* Step 4: 对结果进行处理 */
console.log("Received new value", value);
count += value;
},
});
//…… later in time
arr$.push(6); /* Step 5: push 一个新的元素 */
subs.unsubscribe(); /* Step 6: unsubscribe */
arr$.push(7); /* Step 7: unsubscribe 之后就不会有新的输出了 */
/*
IN-STREAM
isEven 1
isEven 2
square 2
Received new value 4
isEven 3
isEven 4
square 4
Received new value 16
isEven 5
IN-STREAM:push
Emitting new value: 6
isEven 6
square 6
Received new value 36
*/
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=18vc0foyqr7jc