专栏首页YuanXinNodeJS模块研究 - stream

NodeJS模块研究 - stream

构建复杂程序的时候,通常会将系统拆解成若干功能,这些功能的之间的接口遵循一定的规范,以实现组合连接,共同完成复杂任务。例如管道运算符 |

在 nodejs 中,实现各种功能,总避免不了和“数据”打交道,这些数据可能是 Buffer、字符串、数组等等。但当处理大量数据的时候,如何保证程序的稳健性?如何不让内存爆掉呢?nodejs 提供了 stream 模块,来让开发者更优雅地处理数据。这需要开发者理解“流”的含义,遵循“流”的相关规范,来进行“流”编程。

相较于其他的模块系列,需要理解的概念偏多。本文主要从以下几个方面深入 stream 模块:

  • 什么是“流”?
  • 流有哪几种类型?
  • 内部缓冲的作用?
  • 流动模式 vs 暂停模式
  • 背压问题
    • 如何产生的?
    • 如何解决背压问题?
  • 如何定制流
    • 实现可写流
    • 实现可读流
    • 实现双工和转换流

什么是“流”?

流是数据的集合。但它不一定是一次性全部读入内存的,这和程序中的变量不同。举个例子,一个 100GB 的文件,可以通过 fs.createReadStream() 来将文件二进制一点点读入,每次读入的“一点点二进制”,就是流。

乍一看,这样做好像并没有什么显而易见的好处。一点点读入的流,还要一点点处理。但是,处理难度的提高换来的是程序性能提升。如果通过fs.readFile()一次性将 100GB 大文件读入内存,那么可能会发生以下几个问题:

  • 内存错误。本机内存不够,或者超过了 nodejs 的内存限制。
  • 程序性能下降。过高的内存占用可能会导致频繁触发 GC,影响其他程序的运行。

借助流,每次仅读入一小部分的数据,待这部分数据被“消费”后,再读入新的数据。转换思路,不一定必须将要用到的数据一次性全部装入内存

流有哪几种类型?

stream 提供了 4 种基本的流类型:

  • Writable:可写入数据流。
  • Readable:可读取数据流。
  • Duplex:双工流,可读又可写。例如:net.Socket
  • Transform:转换流,它是 Duplex 流的一种。它用于在读写过程中,加工数据。例如:zlib

内部缓冲的作用?

在文档开头部分,就用一节专门提到了“缓冲”。可读流和可写流都会在内部缓存器存储数据,Duplex 和 Transform 也在内部维护了缓存器。在开发者基于流开发时,可以通过传递highWaterMark参数,来修改默认缓冲的大小。

理解缓冲的作用,就要先搞明白缓冲的处理流程:

  • 可写流中:
    1. 调用 write()向流中写入数据
    2. 数据进入可写流缓冲
    3. 数据等待被消费
  • 可读流中:
    1. 调用 read()向流中读取数据
    2. 数据进入可读缓存
    3. 数据等待被消费

在这个过程中,如果可写/可读缓冲中的数据总大小超过了 highWaterMark

  • 可写流的 write()会返回 false,直到缓冲可以继续写入,触发drain事件
  • 可读流会停止从底层资源读取数据

有了内部缓冲机制,就可以限制流的读写速度,防止内存被压垮,解决背压问题。

流动模式 vs 暂停模式

这是可读流的两种模式。可读流开始时是处于暂停模式,之后根据监听的事件、调用的 api,来进行两种模式的切换。文档上写的很详细,但是也会让初学者感到困扰。这里直接从编码风格触发,来学习这两种模式。

编码风格一:监听 readable 事件 + read()

如果可读流监听了 readable 事件,那么处于暂停模式。readable 事件回调触发的条件:

  • 有新的数据
  • 流到达尽头

由于处于暂停模式,因此在事件回调函数中,需要使用 read()来读取数据。

const fs = require("fs");

const rs = fs.createReadStream("./package.json");

rs.on("readable", () => {
    let chunk = null;
    // 在read()时,while循环是必须的
    // 这里的read()可以理解成消费的行为
    while ((chunk = rs.read()) !== null) {
        process.stdout.write(chunk);
    }
})
    .on("end", () => {
        console.log("读取结束");
    })
    .on("error", err => console.error(err));

编码风格二(官方推荐):监听 data 事件 + pause() + resume()

如果可读流监听 data 事件,那么会自动切换至流动模式,事件回调中可以得到数据。不需要调用 read(),换句话说 read()只在暂停模式下调用。

const fs = require("fs");

const rs = fs.createReadStream("./package.json");

rs.on("data", chunk => {
    process.stdout.write(chunk);
})
    .on("end", () => {
        console.log("读取结束");
    })
    .on("error", err => console.error(err));

如果配合可写流进行更复杂操作,发生了背压问题,没有可用的消费者来处理数据,则数据将会丢失。为了方便理解,可以认为它是自动调用 read()进行消费。此时使用 pause()来切换到暂停模式,待消费者可以处理时,再调用 resume()恢复流动模式。

背压问题

如何产生的?

当处理数据的时候,如果数据生产者产生数据的速度 > 数据消费者处理数据的速度,那么由于速度差异没被消耗完的数据就会持续堆积下来,这个现象就是背压(也称积压)。

它会导致资源过度占用,内存耗尽,也会增加 GC 的负担。

如何解决背压问题?

结合前面对缓冲的讲解,在向可写流写入数据的时候,如果超过可写缓存,应该暂停数据读取,等待缓存中数据被消耗完毕后,再继续流动可读流。

下面是一个基于 stream,复制大文件的函数:

function copyBigFile(src, dest, callback) {
    if (typeof callback !== "function") {
        throw TypeError("Callback should be function");
    }

    const rs = fs.createReadStream(src);
    const ws = fs.createWriteStream(dest);

    rs.on("data", chunk => {
        if (!ws.write(chunk)) {
            rs.pause();
        }
    }).on("end", () => {
        ws.end();
        callback();
    });

    ws.on("drain", () => rs.resume());
}

更好的解决方案是,使用可读流上的pipe()函数,或者 stream 模块的pipeline()函数。

pipe 函数实现了以下几个功能:

  • 不断从来源可读流中获得一个指定长度的数据。
  • 将获取到的数据写入目标可写流。
  • 平衡读取和写入速度,防止读取速度大大超过写入速度时,出现大量滞留数据。

用它来处理背压问题非常简单。前面复制文件的函数改写为:

function copyBigFile(src, dest, callback) {
    if (typeof callback !== "function") {
        throw TypeError("Callback should be function");
    }

    const rs = fs.createReadStream(src);
    const ws = fs.createWriteStream(dest);

    rs.pipe(ws).on("close", callback);
}

pipeline 函数是 pipe 的语法糖,它的参数可以是多个流+一个回调函数:

const { pipeline } = require("stream");

function copyBigFile(src, dest, callback) {
    if (typeof callback !== "function") {
        throw TypeError("Callback should be function");
    }

    const rs = fs.createReadStream(src);
    const ws = fs.createWriteStream(dest);
    pipeline(rs, ws, err => callback(err));
}

如何定制流?

在实现自己的类库的时候,可以借助流来处理大容量数据。nodejs 提供给开发者 API,来定制 4 种类型的流。

实现可写流

继承 Writable 类,需要重写_write()方法。并且在实现中必须调用callback()函数,无论成功失败。

下面实现了一个解码器,并且能将解码结果处理成小写:

const { Writable } = require("stream");

class LowerWritable extends Writable {
    constructor(options) {
        super(options);
        this._data = "";
    }
    // 可以去掉 callback 调用看看效果
    _write(chunk, encoding, callback) {
        try {
            chunk = encoding === "buffer" ? chunk.toString() : chunk;
            this._data += chunk.toLocaleLowerCase();
            callback(null);
        } catch (err) {
            callback(err);
        }
    }
    // 流关闭前被调用
    _final(callback) {
        this._data += "\nfinish";
        callback(null);
    }
}

const lw = new LowerWritable();
lw.write("HELLO ");
lw.write(Buffer.from("WORLD!"));
lw.end();

lw.on("finish", () => {
    console.log(lw._data);
});

实现可读流

继承 Readable 类,需要重写_read 方法。内部通过 push 方法来推入数据。

为了方便演示,先实现一个产生数据的类,它继承自 EventEmitter :

const EventEmitter = require("events");

class DataSource extends EventEmitter {
    constructor(str) {
        super();
        this._str = str;
        this._offset = 0;
    }

    read() {
        if (this._offset < this._str.length) {
            // 每次读取一个字符
            this.emit("data", this._str[this._offset]);
            this._offset += 1;
        } else {
            // 读取完成
            this.emit("end");
        }
    }

    stop() {
        this._offset -= 1;
    }
}

最后实现一个可读流,并在读取过程中,将字符转成小写:

const { Readable } = require("stream");

class LowerReadable extends Readable {
    constructor(src) {
        super();
        this._src = src;

        this._src.on("data", chunk => {
            // 一个字符,一个字符的push,而不是一次性push多个字符
            if (!this.push(chunk.toLocaleLowerCase())) {
                this._src.stop();
            }
            // 这里没必要显示地调用 DataSource.read(),否则爆栈
            // 因为 this._read() 会被一直调用,从而触发 data 事件
            // this._src.read();
        });

        this._src.on("end", () => this.push(null));
    }

    _read(size) {
        // 从数据源读取按照读取一个字符长度的数据
        this._src.read();
    }
}

// test code
const dataSrc = new DataSource("HELLO WORLD!");
const lowerStream = new LowerReadable(dataSrc);
lowerStream.pipe(process.stdout);

实现双工和转换流

双工流的实现就是既要重写可写流的_write 方法,也要重写可读流的_read 方法。

转换流的实现是不同的,它需要重写_transform 方法,来接收并产生输出。下面是一段利用转换流来给输出添加标志符的代码:

const { Transform } = require("stream");

const prompt = Buffer.from("<<< ");

class InputLowerTransform extends Transform {
    constructor(options) {
        super(options);
    }
    // 将可写端写入的数据变换后添加到可读端
    _transform(chunk, encode, cb) {
        chunk = Buffer.concat([prompt, chunk], prompt.length + chunk.length);
        // 调用push方法将变换后的数据添加到可读端
        this.push(chunk);
        cb();
    }
}

const transStream = new InputLowerTransform();
process.stdin.pipe(transStream).pipe(process.stdout);

效果如下:

参考连接

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • LeetCode 684.冗余连接 - JavaScript

    输入一个图,该图由一个有着 N 个节点 (节点值不重复 1, 2, …, N) 的树及一条附加的边构成。附加的边的两个顶点包含在 1 到 N 中间,这条附加的边...

    心谭博客
  • LeetCode 295.数据流的中位数 - JavaScript

    题目描述:中位数是有序列表中间的数。如果列表长度是偶数,中位数则是中间两个数的平均值。

    心谭博客
  • 剑指offer - 包含min函数的栈 - JavaScript

    题目描述:定义栈的数据结构,请在该类型中实现一个能够得到栈中所含最小元素的 min 函数(时间复杂度应为 O(1))。

    心谭博客
  • 使用react render props实现倒计时

    react的组件模式可以观看Michael Chan的演讲视频,平时大家常听到的react模式也是HOC, HOC的使用场景很多,譬如react-redu...

    IMWeb前端团队
  • 50行javaScript代码实现简单版的 call , apply ,bind 【中级前端面试基础必备】

    其实 this 的指向,始终坚持一个原理:this 永远指向最后调用它的那个对象,这就是精髓。最关键所在

    Peter谭金杰
  • Oculus + Node.js + Three.js 打造VR世界

    Oculus Rift 是一款为电子游戏设计的头戴式显示器。这是一款虚拟现实设备。这款设备很可能改变未来人们游戏的方式。 周五Hackday Showcase的...

    李海彬
  • 入职第二天:使用koa搭建node server是种怎样的体验

    今天是我入职第二天,leader跟我说,昨天配置好了服务端渲染的文件,今天就先研究研究如何使用koa来搭建一个node server吧!

    闰土大叔
  • 大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上

    尖叫提示1:如果出现如下错误,请下载对应依赖包 ./configure: error: C compiler cc is not found ./configu...

    黑泽君
  • day25_Struts2学习笔记_01

      把重复性的繁琐的代码封装起来。使程序员在编码中把更多的精力放业务需求的分析和理解上面。 特点:封装了很多细节,程序员在使用的时候会非常简单。

    黑泽君
  • 体验concent依赖收集,赋予react更多想象空间

    concent v2版本的发布了,在保留了和v1一模一样的api使用方式上,内置了依赖收集系统,支持同时从状态、计算结果和副作用3个维度收集依赖,建立其精确更新...

    腾讯新闻前端团队

扫码关注云+社区

领取腾讯云代金券