Node中的流

一.流的概念

stream是数据集合,与数组、字符串差不多。但stream不一次性访问全部数据,而是一部分一部分发送/接收(chunk式的),所以不必占用那么大块内存,尤其适用于处理大量(外部)数据的场景

stream具有管道(pipeline)特性,例如:

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

很多原生模块都是基于stream的,包括进程的stdin/stdout/stderr

例如常见的场景:

const fs = require('fs');
const server = require('http').createServer();server.on('request', (req, res) => {
 const src = fs.createReadStream('./big.file');
 src.pipe(res);
});server.listen(8000);

其中pipe方法把可读流的输出(数据源)作为可写流的输入(目标),直接把读文件的输出流作为输入连接到HTTP响应的输出流,从而避免把整个文件读入内存

P.S.甚至日常使用的console.log()内部实现也是stream

二.流的类型

Node中有4种基础流:

  • Readable 可读流是对源的抽象, 从中可以消耗数据,如fs.createReadStream
  • Writable 可写流是对可写入数据的目标的抽象,如fs.createWriteStream
  • Duplex(双工) 双工流既可读又可写,如TCP socket
  • Transform(转换) 转换流本质上是双工流,用于在写入和读取数据时对其进行修改或转换,如zlib.createGzip用gzip压缩数据 转换流看一看做一个输入可写流,输出可读流的函数 P.S.有一种转换流叫(Pass)Through Stream(通过流),类似于FP中的identity = x => x

三.管道

src.pipe(res)要求源必须可读,目标必须可写,所以,如果是对双工流进行管道传输,就可以像Linux的管道一样链式调用:

readableSrc
 .pipe(transformStream1)
 .pipe(transformStream2)
 .pipe(finalWrtitableDest)

pipe()方法返回目标流,所以:

// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux下,等价于
$ a | b | c | d

四.流与事件

事件驱动是Node在设计上的一个重要特点,很多Node原生对象都是基于事件机制(EventEmitter模块)实现的,包括流(stream模块):

Most of Node’s objects — like HTTP requests, responses, and streams — implement the EventEmitter module so they can provide a way to emit and listen to events.

所有stream都是EventEmitter实例,通过事件机制来读写数据,例如上面提到的pipe()方法相当于:

// readable.pipe(writable)readable.on('data', (chunk) => {
 writable.write(chunk);
});
readable.on('end', () => {
 writable.end();
});

P.S.pipe还处理了一些别的事情,比如错误处理,EoF以及某个流的速度较快/较慢的情况

Readable与Writable stream的主要事件和方法如下:

Readable的主要事件有:

  • data事件:stream把一个chunk传递给使用者时触发
  • end事件:再没有要从stream中获取(consume)的数据时触发

Writable的主要事件有:

  • drain事件,断流了,这是Writable stream可以接收更多数据的信号
  • finish事件,当所有数据都已flush到下层系统时触发

五.Readable stream的两种模式:Paused与Flowing

一个Readable stream要么流动(Flowing)要么暂停(Paused),也被称为拉(pull)和推(push)两种模式

创建出来后默认处于Paused状态,可以通过read()方法读取数据。如果处于Flowing状态,数据会持续地流出来,此时只需要通过监听事件来使用这些数据,如果没有使用者的话,数据会丢失,所以都会监听Readable stream的data事件,实际上监听data事件会把Readable stream从Paused状态切换到Flowing,移除data事件监听会再切回来。需要手动切换的话,可以通过resume()pause()来做

使用pipe()方式时不用关心这些,都会自动处理妥当:

  1. Readable触发data事件,直到Writable忙不过来了
  2. pipe收到信号后调用Readable.pause(),进入Paused模式
  3. Writable再干一会儿压力不大了的时候,会触发drain事件,此时pipe调用Readable.resume()进入Flowing模式,让Readable接着触发data事件

highWaterMark与backpressure

其实drain事件就是用来应对Backpressure现象的,简单的说,Backpressure就是下游的消费速度限制了传输,造成下游向上游的反向压力

如果消费速度慢于生产速度,会在下游产生堆积,来不及处理的数据会存放到Writable的buffer里,如果不加(限流)处理,这个buffer会持续增长,可能溢出进而造成错误或数据丢失

Backpressure现象发生的标志是Writable.write()返回了false,说明来自上游的待处理数据量已经触及highWaterMark(高水位线,默认16kb):

Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

这是下游开始有点紧张了(todo项足够忙一阵子了)的信号。建议在此时对上游限流,即调用Readable.pause()先给停了,给下游多点时间处理堆积的数据,下游觉得轻松了会触发darin事件,表示此时有能力处理更多数据了,所以这时候应该开闸放水(Readable.resume()

注意,Readable的数据会存放在缓存中,直到有个Writable来消耗这些数据。所以Paused状态只是说不往下流了,已经缓存的数据还在Readable的buffer里。所以如果不限流,来不及处理的数据就缓存在下游,并持续堆积,限流的话,这部分数据被缓存在上游,因为限流了而不再持续堆积

另外,Readable也有highWaterMark的概念:

The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

是对从实际数据源读取速度的限制(比如从磁盘读文件),防止生产速度太快引发缓存堆积(比如一顿猛push())。所以Flowing Readable的正常工作方式是被push()push()push()…诶,发现buffer里的量已经攒够一个chunk了,吐给下游。同样,Readable触及highWaterMark的标志是push()返回false,说明Readable的buffer不那么十分空了,此时如果还持续push(),没错,也会出现BackPressure(Readable消费能力限制了从数据源到Readable的传输速度):

  快-------------慢
数据源-------->Readable------->Writable
                快--------------慢

只要上游(生产)快,下游(消费)慢就会出现BackPressure,所以在readable.pipe(writable)的简单场景,可能会出现上面两段BackPressure

六.示例

Writable stream

常见的造大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');for(let i=0; i<= 1e6; i++) {
 file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}file.end();

通过fs.createWriteStream()创建指向文件的Writable stream,通过write()填充数据,写完后end()

或者更一般的,直接new一个Writable

const { Writable } = require('stream');
const outStream = new Writable({
 write(chunk, encoding, callback) {
   console.log(chunk.toString());
   // nowrap version
   // process.stdout.write(chunk.toString());
   callback();
 }
});process.stdin.pipe(outStream);

一个最简单的echo实现,把当前进程的标准输入接到自定义输出流outStream,像日志中间件一样(标准输入流经outStream,再该干嘛干嘛去callback):

cc
oo
nn
ss
oo
ll
eeConsole {
 log: [Function: bound consoleCall],
 ...
}

write()方法的3个参数中,chunk是个Buffer,encoding在某些场景下需要,大多数时候可以忽略,callback是应该在chunk处理完毕后调用的通知函数,表明写入成功与否(失败的话,传Error对象进去),类似于尾触发机制中的next()

或者更简单的echo实现:

process.stdin.pipe(process.stdout);

直接把标准输入流连接到标准输出流

Readable stream

const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

通过push向Readable stream里填充数据,push(null)表示结束。上例中把所有数据都读进来,然后才交给标准输出,实际上有更高效的方式(按需推数据给使用者):

const { Readable } = require('stream');
const inStream = new Readable({
 read(size) {
   this.push(String.fromCharCode(this.currentCharCode++));
   if (this.currentCharCode > 90) {
     this.push(null);
   }
 }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read()方法每次吐一个字符,使用者从Readable stream取数据的时候,read()会持续触发

Duplex/Transform stream

Duplex stream兼具Readable和Writable的特点:既可以作为数据源(生产者),也可以作为目标(消费者)。例如:

const { Duplex } = require('stream');const inoutStream = new Duplex({
 write(chunk, encoding, callback) {
   console.log(chunk.toString());
   callback();
 }, read(size) {
   this.push(String.fromCharCode(this.currentCharCode++));
   if (this.currentCharCode > 90) {
     this.push(null);
   }
 }
});inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

上例把前2个例子结合起来了,inoutStream被连接到标准输出流了,A-Z会作为数据源传递给标准输出(打印出来),同时标准输入流被接到inoutStream,来自标准输入的所有数据会被log出来,效果如下:

ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
eeConsole {
 log: [Function: bound consoleCall],
 ...
}

P.S.先输出A-Z是因为pipe()会把Readable stream切换到Flowing模式,所以一开始就把A-Z“流”出来了

注意,Duplex stream的Readable与Writable部分是完全独立的,读写互不影响,Duplex只是把两个特性组合成一个对象了,就像两根筷子一样绑在一起的单向管道

Transform stream是一种有意思的Duplex stream:其输出是根据输入计算得来的。所以不用分别实现read/write()方法,只实现一个transform()方法就够了:

const { Transform } = require('stream');const upperCaseTr = new Transform({
 // 函数签名与write一致
 transform(chunk, encoding, callback) {
   this.push(chunk.toString().toUpperCase());
   callback();
 }
});process.stdin.pipe(upperCaseTr).pipe(process.stdout);

同样,Transform stream的Readable与Writable部分也是独立的(不手动push就不会自动传递到Readable部分),只是形式上结合起来了

P.S.另外,stream之间除了可以传递Buffer/String,还可以传递Object(包括Array),具体见Streams Object Mode

Node提供了一些原生Transform stream,例如zlibcrypto stream:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];fs.createReadStream(file)
 .pipe(zlib.createGzip())
 .pipe(fs.createWriteStream(file + '.gz'));

简单的命令行工具,gzip压缩。更多示例见Node’s built-in transform streams

参考资料

  • Node.js Streams: Everything you need to know
  • Node.js writable.write return false?
  • 探究 Node.js 中的 drain 事件
  • 深入理解 Node.js Stream 内部机制
  • Backpressuring in Streams

本文分享自微信公众号 - 前端向后(backward-fe)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-02-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏机器学习入门

LeetCode Weekly Contest 31解题思路

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

8840
来自专栏运维前线

如何在Linux上安装Node.js

版权声明:本文为木偶人shaon原创文章,转载请注明原文地址,非常感谢。 https://b...

20920
来自专栏机器学习入门

LeetCode Weekly Contest 37解题思路

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

9430
来自专栏机器学习入门

LeetCode Weekly Contest 36解题思路

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

7630
来自专栏素质云笔记

深度学习在推荐领域的应用

用户基础数据:年龄、性别、公司、邮箱、地点、公司等。 关系图:根据人↔人,人↔微博的关注、评论、转发信息建立关系图。 内容数据:用户的微博内容,包含文字...

8330
来自专栏运维前线

实战CentOS系统部署Hadoop集群服务

版权声明:本文为木偶人shaon原创文章,转载请注明原文地址,非常感谢。 https://blog...

10730
来自专栏机器学习入门

算法细节系列(31):链表

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

9840
来自专栏运维前线

程序语言(python、php、Node.js)调用Redis

版权声明:本文为木偶人shaon原创文章,转载请注明原文地址,非常感谢。 https://b...

11840
来自专栏运维前线

CentOS 7.2 部署Node.js开发环境

版权声明:本文为木偶人shaon原创文章,转载请注明原文地址,非常感谢。 https://b...

25220
来自专栏机器学习入门

LeetCode Weekly Contest 46解题思路

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.n...

8830

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励