前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Node.js 流编程

Node.js 流编程

原创
作者头像
多云转晴
发布2024-01-21 19:51:21
1350
发布2024-01-21 19:51:21
举报
文章被收录于专栏:webTowerwebTower

缓冲模式和流模式

  • 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如 fs.writeFilefs.readFile 等;
  • 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。

假如我们要读取一份特别庞大的文件,这份文件有好几个 GB 大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几 GB 的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。

在 Node.js 中可以通过 buffer.constants.MAX\_LENGTH 查看某套开发环境最多可支持多少字节的缓冲区。

缓冲模式压缩文件

代码语言:ts | buf-mode-gzip.ts
复制
// main.js

import { gzip } from 'node:zlib';

import { promisify } from 'node:util';

import { promises as fs } from 'node:fs';



const gzipPromise = promisify(gzip);



const filename = process.argv[2];



async function main() {

    const data = await fs.readFile(filename);

    const gzippedData = await gzipPromise(data);

    await fs.writeFile(`${filename}.gz`, gzippedData);

    console.log('File successfully compressed');

}



main();

运行 node ./main.js ./test.txt 就可以把 test.txt 文件压缩成 .gz 格式的压缩包了。

流模式压缩文件

代码语言:ts | stream-mode-gzip.ts
复制
// main.js

import { createGzip } from 'node:zlib';

import { createReadStream, createWriteStream } from 'node:fs';



const filename = process.argv[2];



createReadStream(filename)

    .pipe(createGzip())

    .pipe(createWriteStream(`${filename}.gz`))

    .on('finish', () => console.log('File successfully compressed'));

流对象结构

Node.js 平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream 核心模块提供的:

  • Readable
  • Writable
  • Duplex
  • Transform

每个 stream 类的对象,本身也都是一个 EventEmmiter 实例,所有流对象实际上可以触发许多事件,比如:

  • Readable 流在读取完毕时会触发 end 事件;
  • Writable 流在写入完毕后会触发 finish 事件;
  • 如果操作过程中发生错误,则会触发 error 事件;

流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:

  • 二进制模式(Binary mode):以 chunk 的形式串流数据,这种模式可以用来处理缓冲或者字符串;
  • 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如 Rxjs 这个库);

Readable 流(可读流)

要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式(non-flowing),也叫暂停模式,另一种是流动模式(flowing)。

非流动模式

下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。

代码语言:ts | read-stdin.ts
复制
process.stdin.on('readable', () => {

    let chunk: Buffer | null;

    console.log('New data available');

    while((chunk = process.stdin.read()) !== null) {

        // 回显

        console.log(

            `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

        );

    }

}).on('end', () => {

    // Windows 上是 Ctrl+Z,Linux和Mac上是 Ctrl+D

    console.log('End of stream');

});

readable 一旦发生(按下 Enter 键),就说明有新的数据可以读取了。

process.stdin.read() 方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。

流动模式

流动模式下,我们不通过 read() 方法提取数据,而是等着流对象把数据推送到 data 监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:

代码语言:ts | read-stdin.ts
复制
process.stdin.on('data', (chunk) => {

    console.log('New data available');

    console.log(

        `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

    );

})

.on('end', () => {

    console.log('End of stream');

});

实现自己的 Readable 流

自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 \_read([size]) 方法提供实现代码,而这个方法内部又必须 readable.push(chunk) 这种操作向缓冲区里面填入数据。

\_read() 方法和 read() 方法不通,后者是给流对象的消费方使用的,而 \_read() 方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 \_read() 将在每次调用 this.push(dataChunk) 后再次调用。 \_read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。

比如下面代码,可以生成随机字符串流对象:

代码语言:ts | random-stream.ts
复制
import { Readable, ReadableOptions } from 'node:stream';

import Chance from 'chance';



const chance = new Chance();



export class RandomStream extends Readable {

    emittedBytes = 0;

    constructor(options?: ReadableOptions) {

        super(options); // 继承

    }



    \_read(size: number): void {

        // 生成长度为 size 的随机字符串

        const chunk = chance.string({ length: size });

        // 推入内部缓冲区

        this.push(chunk, 'utf8');

        this.emittedBytes += chunk.length;

        // 百分之 5 的概率返回 true,并推入 null

        // 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

}
使用 RandomStream
代码语言:ts | random-stream.ts
复制
const randomStream = new RandomStream({

    highWaterMark: 10

});

randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);

});

\_read() 函数中接收一个 size 数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即 highWaterMark 配置项),当然这只是一个建议,不是强迫你必须这么做。

ReadableOptions 接收的 options 参数可能会有这样一些属性:

  • encoding: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是 null
  • objectMode: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是 false
  • highWaterMark: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,它的默认值是 16KB

简化版定制方案

如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options),并把一个包含 read() 方法的对象传给 options 参数即可。

代码语言:ts | random-stream.ts
复制
let emittedBytes = 0;

const randomStream = new Readable({

    highWaterMark: 10,

    read(size: number): void {

        const chunk = chance.string({ length: size });

        this.push(chunk, 'utf8');

        emittedBytes += chunk.length;

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

});



randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${emittedBytes}) bytes of radom data`);

});

Readable.from

// main.js

有个叫做 Readable.from() 的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable 对象当做数据源,轻松构建 Readable 流。

代码语言:ts | exercise.ts
复制
import { Readable } from 'node:stream';



const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);



arrStream.on('data', (char: string) => {

    console.log("🚀 ~ file: exercise.ts:6 ~ arrStream.on ~ char:", char);

}).on('end', () => {

    console.log("end");

});

Writable 流(可写流)

向 Writable 流推送数据,是相当容易的,我们只需要使用 write 方法就行了,方法前面是:

代码语言:ts
复制
writable.write(chunk, [encoding], [callback])

其中 encoding 参数和 callback 参数是可选的。如果 chunk 是字符串,那么 encoding 参数默认是 utf8,如果 chunk 是 Buffer,那么该参数的值会为系统所忽略。callback 表示这个函数会在系统把数据块写入底层资源的时候,得到调用。

如果想告诉 Writable 流,已经没有数据需要写入了,那么应该调用 end() 方法:

代码语言:ts
复制
writable.end([chunk], [encoding], [callback]);

下面代码我们创建了一个小的 HTTP 服务器程序,让它输出一些随机字符串:

代码语言:ts | exercise.ts
复制
const chance = new Chance();

const server = createServer((\_, res) => {

    res.writeHead(200, { 'Content-Type': 'text/plain' });

    while(chance.bool({ likelihood: 95 })) {

        res.write(`${chance.string()}\n`);

    }

    res.end('\n\n');

    res.on('finish', () => console.log('All data send.'));

});

server.listen(8082, () => {

    console.log('listening on http://localhost:8082');

});

res 对象不仅是 http.ServerResponse 实例,同时也是一个 Writable 流。上面代码我们使用 curl localhost:8082 命令就可以看到服务器发来的随机字符串了。

实现 Writable 流

要实现一种新的 Writable 流,我们可以继承 Writable 类,并实现 \_write() 方法。

假如我们要实现这样一种 Writable 流,接收下面这种格式的对象:

代码语言:js
复制
{

    path: <文件路径>

    content: <字符串或 buffer>

}

每收到这样一个对象,我们就会把 path 所指的路径下创建一份文件,并把 content 属性的内容存入该文件。

大家应该意识到,输入给我们这种 Writable 流的数据,并不是字符串或 Buffer,而应该是对象,因此这种流必须在对象模式下运作。代码如下:

代码语言:ts | to-file-stream.ts
复制
import { dirname } from 'node:path';

import { promises as fs } from 'node:fs';

import { Writable, WritableOptions } from 'node:stream';



interface ChunkType {

    path: string;

    content: string | Buffer;

}

export class ToFileStream extends Writable {

    constructor(options: WritableOptions) {

        super({ ...options, objectMode: true });

    }



    \_write(chunk: ChunkType, \_encoding: BufferEncoding, callback: (error?: Error) => void) {

        // 递归创建多级文件夹,然后写入文件

        fs.mkdir(dirname(chunk.path), { recursive: true })

            .then(() => fs.writeFile(chunk.path, chunk.content))

            .then(() => callback())

            .catch(callback);

    }

}

使用:

代码语言:ts | to-file-stream.ts
复制
tfs.write({

    path: join('files', 'file1.txt'),

    content: 'Hello',

});

tfs.write({

    path: join('files', 'file2.txt'),

    content: 'Node.js',

});

tfs.write({

    path: join('files', 'file3.txt'),

    content: 'streams',

});



tfs.end(() => console.log('All files created.'));

另外,通 Readable 流一样,Writable 流也支持简化版的定制方案:

代码语言:ts | to-file-stream.ts
复制
const tfs = new Writable({

    objectMode: true,

    write(chunk: ChunkType, \_encoding: BufferEncoding, callback) {

        fs.mkdir(dirname(chunk.path), { recursive: true })

            .then(() => fs.writeFile(chunk.path, chunk.content))

            .then(() => callback())

            .catch(callback);

    }

});

backpressure(防拥堵机制)

写入数据的速度可能要比消耗数据的速度要快,为了应对这种情况,流对象会把写进来的数据先放入缓冲区,但如果给该对象写入数据的那个人不知道已经出现这种情况,那么还是会不断地写入,导致内部缓冲区里面的数据越积越多,让内存使用量变得比较高。

为了提醒写入方注意这种问题,writable.write() 方法会在内部缓冲区触碰 highWaterMark(内部缓冲区的数据上限) 上限的时候,返回 false,以表明此时不应该再向其中写入内容。当缓冲区清空时,流对象会触发 drain 事件,以提示现在又可以向里面写入数据了。这套机制就叫做 backpressure(防拥堵机制)。

backpressure 只是一套建议机制,而不是强制实施的。即便 write() 返回 false,我们也还是可以忽略这个信号,继续往里面写入,让缓冲区越变越大。

这套机制其实在 Readable 流中也有类似的体现,在实现 \_read() 方法时,如果发现自己调用 push() 方法得到的结果是 false,那就不应该再向其中推送新数据了。这个问题仅仅需要由实现 Readable 流的人来担心,而不太需要由使用这种流的人负责处理。

下面代码实现的是防拥堵机制的输出随机字符 HTTP 服务器:

代码语言:ts | exercise.ts
复制
const chance = new Chance();

const server = createServer((\_, res) => {

    res.writeHead(200, { 'Content-Type': 'text/plain' });

    function generateMore() {

        while(chance.bool({ likelihood: 95 })) {

            const randomChunk = chance.string({

                length: (16 \* 1024) - 1

            });

            const shouldContinue = res.write(`${randomChunk}\n`);

            if (!shouldContinue) {  // 是否已拥堵

                console.log('back-pressure');

                // 监听 drain 事件,表面现在又可以向里面写入数据了

                return res.on('drain', generateMore);

            }

        }

        res.end('\n\n');

    }

    generateMore();

    res.on('finish', () => console.log('All data send.'));

});

server.listen(8082, () => {

    console.log('listening on http://localhost:8082');

});

以上就是可读流和可写流的全部内容了,双工流和转换流下期分享!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 缓冲模式和流模式
    • 缓冲模式压缩文件
      • 流模式压缩文件
      • 流对象结构
      • Readable 流(可读流)
        • 非流动模式
          • 流动模式
            • 实现自己的 Readable 流
              • 使用 RandomStream
            • 简化版定制方案
              • Readable.from
              • Writable 流(可写流)
                • 实现 Writable 流
                  • backpressure(防拥堵机制)
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档