前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《The Joy of Javascript》- 5 - Data

《The Joy of Javascript》- 5 - Data

作者头像
szhshp
发布2022-09-21 10:41:13
5950
发布2022-09-21 10:41:13
举报

相关文章

一本书里面内容较多, 因此分成了多篇 Post, 可以从此处看到相关文章:

Data

Linear Async Flows

首先回顾一下 JS 原生的一些特性:

  1. Event-driven—The JavaScript engine uses an event loop to constantly monitor a task queue, also known as a callback queue. When a task is detected, the event loop dequeues the task and runs it to completion.
  2. Single-threaded—JavaScript provides a single-threaded model to developers. There are no standard, language-level threading APIs to spawn new threads.
  3. Asynchronous—All modern JavaScript engines use multiple threads (managed by an internal worker pool) so that you can perform nonblocking I/O actions without blocking the main thread.

然后是 NodeJS 的一些实现

额外需要注意的是: JS 原生是单线程, 但是一些 underlying platform (Browser and NodeJS).

Promise

一个简单的 Promise

代码语言:javascript
复制
const someFutureValue = new Promise((resolve, reject) => {
  const value = doSomething()
  if (value === null) {
    reject(new Error("Ooops!"));
  }
  resolve(value);
});

someFutureValue.then(doSomethingElseWithThatValue);

Interoperability for Promise & ADT | Promise 和 ADT 的共通性

Promise 和上方的一个 ADT: Validation 很是相似, 可以看到 Promise 有一些 ADT 的特性

  1. Identity: Promise.resolve('aa').then(identity); and Promise.resolve('aa'); 两者相同, 都最终生成 Promise { 'aa' }
  2. Composition:
代码语言:javascript
复制
Promise.resolve("aabbcc").then(unique).then(join).then(toUpper);
Promise.resolve("aabbcc").then(compose(toUpper, join, unique));

同上两者相同, 都得到 Promise { 'ABC' }

  1. 并且 Promise.then()Promise.catch() 其实也就和 Validation 里面的 Success 和 Failure 里面的逻辑类似

Promise 没有完全符合 ADT 的 Fantasy Land Specification (Chapter 5), 但是确实和 ADT 的行为很相似

Async Iteration | 异步迭代

假设有 A,B,C 三个 Promise 需要按照顺序依次执行.

Traditional Loop

比较传统的使用 for 和 reduce 来进行处理的方法:

代码语言:javascript
复制
const sleep = ms => {
  return new Promise(resolve => setTimeout(resolve, ms))
}
for(let i = 0;i <= 10;i++){
    await sleep(1000);
    console.log(new Date(), i);
}
代码语言:javascript
复制
[delay("a", 500), delay("b", 100), delay("c", 200)].reduce(
  (chain, next) => chain.then(() => next).then(::console.log),
  Promise.resolve()
);
for await……of

MDN: for await……of

使用 for await……of 来处理的方法:

Note: for await……of doesn't work with async iterators that are not async iterables.

代码语言:javascript
复制
var txt = "";
const array = [1,2,3,4,5];
async function test() {
  for await (const p of array) {
      var txt = p + "</br>";
      document.write(txt);
  }
}
test();

需要注意的是 for await……of 需要一个对象拥有一个 function-valued symbol property Symbol.asyncIterator, 因此可以如此设计一个对象用于 for await……of

代码语言:javascript
复制
const LIMIT = 3;

const asyncIterable = {
  [Symbol.asyncIterator]() {    /* 必须要拥有这个属性才能使用 for await……of */
    let i = 0;
    return {
      next() {
        const done = i === LIMIT;
        const value = done ? undefined : i++;
        return Promise.resolve({ value, done });
      },
      return() {
        // This will be reached if the consumer called 'break' or 'return' early in the loop.
        return { done: true };
      }
    };
  }
};

(async () => {
  for await (const num of asyncIterable) {
    console.log(num);
  }
})();
// 0
// 1
// 2
Example
代码语言:javascript
复制
var tasks = [
  ["a", 500],
  ["b", 100],
  ["c", 200],
];

function delay(value, time) {
  return new Promise((resolve) => {
    console.log(value);
    setTimeout(resolve, time, value);
  });
}

function delayedIterator(tasks) {
  return {
    async next() {
      /* 每一次从任务中取出最顶上一个, 需要注意的是这里 task 会被缓存。直到全部取完之返回一个 done:true */
      if (tasks.length) {
        const [value, time] = tasks.shift();
        return await delay(value, time);
      }
      return Promise.resolve({ done: true });
    },
  };
}
var it = delayedIterator(tasks);

/* 多次执行之后直到最后一次会得到 done */
await it.next().then(({ value, done }) => { value; done; }); /* a */
await it.next().then(({ value, done }) => { value; done; }); /* b */
await it.next().then(({ value, done }) => { value; done; }); /* c */
await it.next().then(({ value, done }) => { value; done; }); /* done */

以及一个更简洁简洁的例子:

代码语言:javascript
复制
var tasks = [ ["a", 500], ["b", 100], ["c", 200], ];

function delay(value, time) {
  return new Promise((resolve) => {
    // console.log(value);
    setTimeout(resolve, time, value);
  });
}

const delayedIterator = (task) => () => ({
  async next() {
    if (tasks.length) {
      const [value, time] = tasks.shift();
      await delay(value, time);
      return { done: false, value };
    }
    return Promise.resolve({ done: true });
  },
});

var delayedIterable = { [Symbol.asyncIterator]: delayedIterator(tasks) };

for await (const value of delayedIterable) {
  console.log(value);
}

Stream Programming

Iterables & Iterator Protocol

上一节正好有一个例子可以参考

关于 IterablesIterator Protocol:

  • Iterables 是一个可以被枚举,遍历及循环的对象
  • 一般需要一个 Symbol.iterator 元素来决定遍历的细节.
  • Symbol.iterator 可以是一个简单的函数或者是一个 Generator

对于 Iterator 需要满足一些协议:

  • 一个方法返回至少两个参数: done: booleanvalue: any
    • 用于标记遍历操作是否已经完成以及对应的值
    • 需要注意的是 done 返回 true 的时候 value 会被忽略 (便利完成就就没有必要返回值了)
  • 如果一个遍历器没有返回上面这两个属性, 那么在遍历的时候会抛出一个错误.
Example
代码语言:javascript
复制
class Block {
  index = 0;
  constructor(index, previousHash, data = [], difficulty = 0) {
    this.index = index;
    this.previousHash = previousHash;
    this.data = data;
    this.nonce = 0;
    this.difficulty = difficulty;
    this.timestamp = Date.now();
    this.hash = this.calculateHash();
  } 
  
  //……
  [Symbol.iterator]() {
    return this.data[Symbol.iterator]();
  }
}

for (const transaction of block) { 
  // do something with transaction
}
Strings implementing @@iterator

[Symbol.iterator] 其实是个普遍存在的属性, Array/Map/Set 以及 String 都有这个属性

代码语言:javascript
复制
"Joy of JavaScript"[Symbol.iterator]; // [Function: [Symbol.iterator]]

for(const letter of "Joy of JavaScript") {
  console.log(letter);
}

Generators

需要注意的是: Generator 是一个函数, 并且不能用 Arrow Function 来定义. (也许未来版本 ES 会支持)

代码语言:javascript
复制
function* sayIt() {
  yield "The";
  yield "Joy";
  yield "of";
  yield "JavaScript!";
}



const it = sayIt();
it.next(); // { value: 'The', done: false } 
it.next(); // { value: 'Joy', done: false }
/* …… */


// 当然也可以使用 `for` 循环来遍历

/* 
const it = sayIt();
for(const message of sayIt()) { 
  console.log(message);
}
 */

在 Class 中的形式:

代码语言:javascript
复制
class SomeClass {
  *sayIt() {
    return "The Joy of JavaScript!";
  }
}
Creating iterable objects

先看一个简单版的例子:

代码语言:javascript
复制
const obj = {
  *[Symbol.iterator]() { 
    yield 1
    yield 2
    yield 3
  }
}


/* 其实这里使用 destructuring assignment 执行了三次 yield */
const [a, b, c] = obj

console.log(a,b,c);  // 1,2,3

其核心在于 yield 了多次, 使用 destructuring assignment 一次性将值全部取出来, 下面还有一个稍微复杂一点的例子:

代码语言:javascript
复制
class Validation {
  #val;
  /* …… */
  
  *[Symbol.iterator]() {
    yield this.isFailure ? Failure.of(this.#val) : undefined;
    yield this.isSuccess ? Success.of(this.#val) : undefined;
  }
}


/* 第一个元素和 Success 的情况无关可以忽略 */
const [, right] = Success.of(2);
right.isSuccess; // true
right.get(); // 2

/* 第二个元素 right 和 Failure 的情况无关也可以忽略 */
const [left, /* right */] = Failure.of(new Error("Error occurred!"));
left.isFailure; // true
left.getOrElse(5); // 5
Async Generators
  • 相比普通的 Generator 会 yield 一个值, Async Generator 会 yield 一个 Promise.
  • 后期可以使用 for await……of 来遍历
  • 适用于一个大文件的分段获取, 是一个很简单的例子,用于计算 chunk 的数目, 当然 Async Generator 的潜力很大, 可以进行一些复杂的操作, 比如对每个 chunk 进行 validate 等等
代码语言:javascript
复制
async function* generateBlocksFromFile(file) {  /* 'async function*' 定义一个异步迭代器  */
  try {
    const dataStream = fs.read(file);
    let previousDecodedData = "";
    for await (const chunk of dataStream) {
      previousDecodedData += chunk;
      let separatorIndex;
      while ((separatorIndex = previousDecodedData.indexOf(";")) >= 0) {
        const decodedData = previousDecodedData.slice(0, separatorIndex + 1);
        const blocks = tokenize(";", decodedData)
          .filter((str) => str.length > 0)
          .map((str) => str.replace(";", ""));
        for (const block of blocks) {
          /* 对每个 chunk 进行 yeild */
          yield JSON.parse(block);
        }
      }
    }
    if (previousDecodedData.length > 0) {
      /* 对多种情况进行 yeild */
      yield JSON.parse(previousDecodedData);
    }
  } catch (e) {
    console.error(`Error processing file: ${e.message}`);
    throw e;
  }
}






let result = 0;

/* 然后通过循环就可以逐步执行 Promise */
for await (const block of generateBlocksFromFile("blocks.txt")) {
  console.log("Counting block", block.hash);
  result++;
  previousDecodedData = previousDecodedData.slice(separatorIndex + 1);
}
result; // 3, 最终进行了 3 个部分的 await

EventEmitter

Basic Usage of EventEmitter

EventEmitter 需要通过 Node.js 的 events 模块来使用

代码语言:javascript
复制
const EventEmitter = require('events');

const myEmitter = new EventEmitter();
myEmitter.on("some_event", () => {
  console.log("An event occurred!");
});
myEmitter.emit("some_event");
Extend Array with EventEmitter
代码语言:javascript
复制
const {EventEmitter} = require('events'); 

class PushArray extends Array {
  static EVENT_NAME = "new_value";

  #eventEmitter = new EventEmitter();

  constructor(……values) {
    super(……values);
  }

  push(value) {
    this.#eventEmitter.emit(PushArray.EVENT_NAME, value);
    return super.push(value);
  }

  subscribe({ next }) {
    this.#eventEmitter.on(PushArray.EVENT_NAME, (value) => {
      next(value);
    });
  }

  unsubscribe() {
    this.#eventEmitter.removeAllListeners(PushArray.EVENT_NAME);
  }
}

const pushArray = new PushArray(1, 2, 3);
pushArray.subscribe({
  next(value) {
    console.log("New value:", value); // do something with value
  },
});
pushArray.push(4);
pushArray.push(5);
pushArray.unsubscribe();
pushArray.push(6);

Observable

An Observable object is designed to model a lazy, unidirectional, push-based data source (such as streams).

Observable 暂时没有 JS 的原生实现, 一般通过 RxJS 或者 core-js 进行使用, 它具有一些特性:

  1. Unidirectional Data Flow(Data Propagation): 数据流一定是从 Publisher 到 Subscriber .
  2. Declarative, Lazy Pipeline: 只有当 Subscriber 订阅时才会被创建.

一个 Observable 的大体模板:

代码语言:javascript
复制
const Observer = {
  next(event) {
    // Receives each event in the stream
  },
  error(e) {
    // Triggered when an exception occurs somewhere along the observable
  },
  complete() {
    // Called when there are no more values to emit; not called on error
  },
};
Example - Simple Observable

需要提前安装 core-js: npm install core-js -S

代码语言:javascript
复制
import 'core-js/features/observable/index.js'

function newRandom(min, max) {
  return Math.floor(Math.random() * (max - min)) + min;
}

/* 定义一个 Observable */
const randomNum$ = new Observable((observer) => {
  const _id = setInterval(() => {
    observer.next(newRandom(1, 10));
  }, 1_000);

  return () => {
    clearInterval(_id);
  };
});

const subs = randomNum$.subscribe({
  next(number) {
    console.log("New random number:", number);
  },
  complete() {
    console.log("Stream ended");
  },
});

// some time later…… 
subs.unsubscribe();
Observable Datastream Methods

下面来实现针对 Observable 的一些方法

map() for Observable
代码语言:javascript
复制
import 'core-js/features/observable/index.js'

/**
 * @name curry
 * @see https://github.com/JoyOfJavaScript/joj/blob/7f231147029df787bd9eb510f42354adc7461ac0/src/blockchain/src/util/fp/combinators.js
 */
export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }

/**
 * @name map
 * @description 给 Observable 添加一个 map 的方法 
 * @param fn {Function} map handler
 * @param stream {object (Observable)} 接受一个 Observable
 */
const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);
  • 这个 map() 返回了一个对 Observable(stream) 进行 subscribe 的另一个新的 Observable(New Observable) 对象
  • 对 New Observable 进行 subscribe 就可以对 stream 里面的所有元素执行迭代
代码语言:javascript
复制
const square = num => num ** 2;

map(
  square,
  Observable.of(1, 2, 3, 4) 
  /* map 会返回一个 New Observable, subscribe 这个 New Observable 就可以 subscribe stream 指向的 Observable */
).subscribe({
  next(number) {
    console.log(number);
  },
  complete() {
    console.log('Stream ended');
  }
});

// Prints 1  4  9  16

同样的最终的目的是使得代码可以连续使用:

代码语言:javascript
复制
const square = num => num ** 2;

const add = curry((x, y) => x + y);

const subs = map(square, map(add(1), Observable.of(1, 2, 3)))
  .subscribe({
    next(number) {
      console.log(number);
    },
    complete() {
      console.log('Stream ended');
    }
  })
// Prints: 4  9  16

如此创造了一个 dataStream:

  1. Observable.of() 得到数据: 1,2,3
  2. add(1) 处理后得到数据: 2,3,4
  3. square 处理后得到数据, 4,9,16
filter() for Observable
代码语言:javascript
复制
const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {  
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);
reduce() for Observable
代码语言:javascript
复制
const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});
skip() for Observable
代码语言:javascript
复制
const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});
Example - Datastream with Observable
代码语言:javascript
复制
import 'core-js/features/observable/index.js'
const square = num => num ** 2;
const isEven = num => num % 2 === 0;


export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }


const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);

const add = curry((x, y) => x + y);

const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);

const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

const obs = Observable.of(1, 2, 3, 4);


reduce( /* Step4: reduce with add, 最终得到一个 Observable 里面数据包含 20 */
  add,
  0,
  map(
    square, /* Step3: 平方, 得到 4, 16 */
    filter(
      isEven, /* Step2: 仅仅筛选双数, 保留 2,4 */
      skip(1, obs)   /* Step1: 跳过第一个, 返回一个 Observable 里面包含数据 2,3,4 */
    )
  )
)
  .subscribe({
    next(value) {
      console.log(value);
    },
    complete() {
      // done();
    }
  });
Observable as Mixin

上方的语法还是有一些难懂, 我们希望可以使用 chained statement 来进行简化. 这个时候就可以设计一个 Mixin:

代码语言:javascript
复制
export const ReactiveExtensions = {

  /* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
  filter(predicate) {
    return filter(predicate, this);
  },
  map(fn) {
    return map(fn, this);
  },
  skip(count) {
    return skip(count, this);
  },
  reduce(accumulator, initialValue = {}) {
    return reduce(accumulator, initialValue, this);
  },
};


/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);



/* Step1: 生成一个 Observable */
Observable.of(1, 2, 3, 4)
  .skip(1)  /* Step2: 调用 Observable 的 skip 方法, 这个方法会返回一个 New Observable 并且在 New Observable 里面对当前调用的这个 Observable 进行 subscribe */
  .filter(isEven) /* Step3: 同上, 新建第二个 New Observable 并且对第一个 Observable 进行 subscriber */
  .map(square) /* Step4: 新建第三个 New Obs 并且对 第二个进行 subscribe */
  .reduce(add, 0) /* 如此循环最终返回一个 Obs, 并且最后进行 subscribe */
  .subscribe({
    next(value) {
      console.log(value);
    },
  });
Representing push streams with generators

上方都是使用 Observable.of() 来创造一个 Observable, 我们可以扩展一个 Observable.from(generator) 的方法通过 Generator 来构造一个 Observable

代码语言:javascript
复制
import 'core-js/features/observable/index.js'

/* node 16 需要额外的库来支持 readable 方法 */
import {Readable} from 'readable-stream'

Object.defineProperty(Observable, 'fromGenerator', {
  value(generator) {
    return new Observable(observer => {
      Readable.from(generator)
        .on('data', (x) => observer.next(x))
        .on('end', (x) => observer.complete(x));
    });
  },
  enumerable: false,
  writable: false,
  configurable: false
});

/* 另外也支持 async 的 generator (async) */
function* words() {
  yield 'The';
  yield 'Joy';
  yield 'of';
  yield 'JavaScript';
}

Observable.fromGenerator(words())
  .subscribe({
    next: (x)=>console.log(x),
  })

Example - Stream Programming

Before
代码语言:javascript
复制
let validBlocks = 0;
const chain = new Blockchain();
let skippedGenesis = false;

/* Step1: 读取一个文件 */
for await (const blockData of generateBlocksFromFile("blocks.txt")) {
  /* Step2: 跳过一些内容 */
  if (!skippedGenesis) {
    skippedGenesis = true;
    continue;
  }
  /* 对 Block 进行一些操作 */
  const block = new Block(
    blockData.index,
    chain.top.hash,
    blockData.data,
    blockData.difficulty
  );
  chain.push(block);
  /* validate */
  if (block.validate().isFailure) {
    continue;
  }
  validBlocks++;
}
After
代码语言:javascript
复制
const chain = new Blockchain();

// 将一些方法抽取出来作为 helper functions
const validateBlock = (block) => block.validate();
const isSuccess = (validation) => validation.isSuccess;
const boolToInt = (bool) => (bool ? 1 : 0);
const addBlockToChain = curry((chain, blockData) => {
  const block = new Block(
    blockData.index,
    chain.top.hash,
    blockData.data,
    blockData.difficulty
  );
  return chain.push(block);
});

// Main Logic: 可见业务逻辑明显变得更简单
Observable.fromGenerator(generateBlocksFromFile("blocks.txt"))
  .skip(1)
  .map(addBlockToChain(chain))
  .map(validateBlock)
  .filter(prop("isSuccess"))
  .map(compose(boolToInt, isSuccess))
  .reduce(add, 0)
  .subscribe({
    next(validBlocks) {
      if (validBlocks === chain.height() - 1) {
        console.log("All blocks are valid!");
      } else {
        console.log("Detected validation error in blocks.txt");
      }
    },
    error(error) {
      console.error(error.message);
    },
    complete() {
      console.log("Done validating all blocks!");
    },
  });
Extra: Streamifying objects

这一节通过设置 Object 的 Symbol.observable 属性以实现返回一个 Observable

代码语言:javascript
复制
import 'core-js/features/observable/index.js'

const Pair = (left, right) => ({
  left,
  right,
  [Symbol.observable]() {
    return new Observable(observer => {
      observer.next(left);
      observer.next(right);
      observer.complete();
    });
  }
});

Observable.from(Pair(20, 30))
  .subscribe({
    next(value) {
      console.log('Pair element: ', value);
    }
  });
代码语言:javascript
复制
class Blockchain {
  blocks = new Map();
  blockPushEmitter = new EventEmitter();

  constructor(genesis = createGenesisBlock()) {
    this.top = genesis;
    this.blocks.set(genesis.hash, genesis);
    this.timestamp = Date.now();
    this.pendingTransactions = [];
  }

  push(newBlock) {
    newBlock.blockchain = this;
    this.blocks.set(newBlock.hash, newBlock);
    this.blockPushEmitter.emit(EVENT_NAME, newBlock);
    this.top = newBlock;
    return this.top;
  }
  //……

/* 返回一个 Obs */
  [Symbol.observable]() {
    return new Observable(observer => {
      for (const block of this) {
        observer.next(block);
      }
      this.blockPushEmitter.on(EVENT_NAME, block => {
        console.log('Emitting a new block: ', block.hash);
        observer.next(block);
      });
    });
  }
}


const chain = new Blockchain();
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
chain.push(new Block(chain.height() + 1, chain.top.hash, []));

const subs = Observable.from(chain)
  .subscribe({
    next(block) {
      console.log('Received block: ', block.hash);
      if (block.validate().isSuccess) {
        console.log('Block is valid');
      }
      else {
        console.log('Block is invalid');
      }
    }
  });

// …… later in time
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
subs.unsubscribe();
chain.height(); // 4

Outputs:

代码语言:javascript
复制
Received block b81e08daa89a92cc4edd995fe704fe2c5e16205eff2fc470d7ace8a1372e7de4
Block is valid
Received block 352f29c2d159437621ab37658c0624e6a7b1aed30ca3e17848bc9be1de036cfd
Block is valid
Received block 93ff8219d77be5110fa61978c0b5f77c6c8ece96dd3bba2dc6c3c4b731a724e7
Block is valid
Emitting a new block: 
07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Received block 07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Block is valid

另外如果 push 一个 invalid 的 Block :

代码语言:javascript
复制
chain.push(new Block(-1, chain.top.hash, []))

会得到以下输出:

代码语言:javascript
复制
Emitting a new block: 
c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Received block c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Block is invalid

Dynamic streamification

这里是一个完整的将 Observable 进行动态数据流转化的例子.

obUtils.js: 这个文件里面都是上文提到过的一些 util 方法

代码语言:javascript
复制
export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }


export const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);


export const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

export const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);

export const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

export const ReactiveExtensions = {
  /* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
  filter(predicate) {
    return filter(predicate, this);
  },
  map(fn) {
    return map(fn, this);
  },
  skip(count) {
    return skip(count, this);
  },
  reduce(accumulator, initialValue = {}) {
    return reduce(accumulator, initialValue, this);
  },
};

ob.js

代码语言:javascript
复制
import EventEmitter from 'events'
import 'core-js/features/observable/index.js'
import { ReactiveExtensions,curry } from'./obUtil.js'

/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);

const square = num => {
  console.log('square', num);
  return num ** 2
};
const isEven = num => {
  console.log('isEven', num);
  return num % 2 === 0
};

const ON_EVENT = "on";  /* 自定义的事件名称 */
const END_EVENT = "end";
const LOG_LABEL = `IN-STREAM`;
const LOG_LABEL_INNER = `${LOG_LABEL}:push`;

function implementsPush(obj) {
  return obj
    && Symbol.iterator in Object(obj)
    && typeof obj['push'] === 'function'
    && typeof obj[Symbol.iterator] === 'function';
}

const reactivize = (obj) => {
  implementsPush(obj) ||
    new TypeError("Object does not implement a push protocol");

  const emitter = new EventEmitter();

  /* 
    变量 pushProxy 会被 Object.assign 扩展到返回值里面
    实际上这里有用的就是一个 Proxy 的细节
    因此变量名称不重要
  */
  const pushProxy = new Proxy(obj, {
    get(……args) {
      /*  使用 spread operator 获取所有参数 */
      const [target, key] =
        args; /* 第一个参数是 target obj, 第二个参数是对应的 key */
      if (key === "push") {   /* Step6: 通过 Proxy 监听了 push 事件 */
        const methodRef = target[key];
        return (……capturedArgs) => {
          const result = methodRef.call(
            target,
            ……[capturedArgs]
          ); /* 实际执行一次 Push */
          emitter.emit(ON_EVENT, ……capturedArgs); /* Step7: 此处调用一次 ON_EVENT */
          return result;
        };
      }
      return Reflect.get(……args);
    },
  });

  /* 
    变量 observable 会被 Object.assign 扩展到返回值里面
    实际上这里有用的就是 [Symbol.observable] 这个属性
    因此变量名称也不重要
  */
  const observable = {
    [Symbol.observable]() {
      return new Observable((observer) => {
        console.group(LOG_LABEL);

        /* Step8: 监听 ON_EVENT, 并进行处理, 核心在于 observer.next() */
        emitter.on(ON_EVENT, (newValue) => {
          console.group(LOG_LABEL_INNER);
          console.log("Emitting new value: ", newValue);
          observer.next(newValue);
          console.groupEnd(LOG_LABEL_INNER);
        });
        emitter.on(END_EVENT, () => {
          observer.complete();
        });
        for (const value of obj) {
          observer.next(value);
        }
        return () => {
          console.groupEnd(LOG_LABEL);
          emitter.removeAllListeners(ON_EVENT, END_EVENT);
        };
      });
    },
  };

  return Object.assign(pushProxy, observable);
};


let count = 0;
const arr$ = reactivize([1, 2, 3, 4, 5]);
const subs = Observable.from(arr$)  /* Step1: 得到一个 Obs */
  .filter(isEven)  /* Step2: 筛选偶数 */
  .map(square) /* Step3: 平方 */
  .subscribe({
    next(value) {
      /* Step 4: 对结果进行处理 */
      console.log("Received new value", value);
      count += value;
    },
  });
//…… later in time
arr$.push(6); /* Step 5: push 一个新的元素 */
subs.unsubscribe();  /* Step 6: unsubscribe */
arr$.push(7); /* Step 7: unsubscribe 之后就不会有新的输出了 */


/* 

IN-STREAM
  isEven 1
  isEven 2
  square 2
  Received new value 4
  isEven 3
  isEven 4
  square 4
  Received new value 16
  isEven 5
  IN-STREAM:push
    Emitting new value:  6
    isEven 6
    square 6
    Received new value 36

*/

Summary

  • An Iterator object has the method next, which returns an object with properties value and done. value contains the next element in the iteration, and done is the control switch that stops the iteration process.
  • An async iterator follows the same behavior as a normal iterator except that next returns a Promise with a result of the same shape {value, done}.
  • To build custom enumerable objects, you can implement Symbol.iterator. You can also define Symbol.asyncIterator to enumerate the pieces of your objects asynchronously.
  • Generators are a special type of function that can produce a sequence of values instead of a single value—a factory for iterables. A generator function is identified by an asterisk (*).
  • A generator function returns a Generator object that implements the iterator protocol, which means you can consume it by using the for……of loop.
  • The difference between a normal generator and an async generator is that generated values are wrapped by a Promise. To consume an async generator, you can use the for await……of loop.
  • Streams are sequences of values emitted over time. Anything can become a stream, such as a single value, an array, or a generator function. Anything that is iterable can be modeled as a stream.
  • The new Observable API proposes to make stream-based, reactive programming easier.
  • Observables are push-based, declarative streams. Their programming model is based on publish/subscribe. Observables are agnostic to the type of data in the sequence and to whether the data is synchronous or asynchronous; the programming model is the same.
  • You can create and augment your own observable objects by implementing a function-valued Symbol.observable property.

Extra

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=18vc0foyqr7jc

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-07-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 相关文章
  • Data
    • Linear Async Flows
      • Promise
        • Interoperability for Promise & ADT | Promise 和 ADT 的共通性
          • Async Iteration | 异步迭代
            • Traditional Loop
            • for await……of
        • Stream Programming
          • Iterables & Iterator Protocol
            • Example
            • Strings implementing @@iterator
          • Generators
            • Creating iterable objects
            • Async Generators
          • EventEmitter
            • Basic Usage of EventEmitter
            • Extend Array with EventEmitter
          • Observable
            • Example - Simple Observable
            • Observable Datastream Methods
            • Example - Datastream with Observable
            • Observable as Mixin
            • Representing push streams with generators
          • Example - Stream Programming
            • Before
            • After
            • Extra: Streamifying objects
        • Dynamic streamification
        • Summary
        • Extra
        相关产品与服务
        云开发 CloudBase
        云开发(Tencent CloudBase,TCB)是腾讯云提供的云原生一体化开发环境和工具平台,为200万+企业和开发者提供高可用、自动弹性扩缩的后端云服务,可用于云端一体化开发多种端应用(小程序、公众号、Web 应用等),避免了应用开发过程中繁琐的服务器搭建及运维,开发者可以专注于业务逻辑的实现,开发门槛更低,效率更高。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档