前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第七十七期:Node中的streams流(pipe管道和pump泵)

第七十七期:Node中的streams流(pipe管道和pump泵)

作者头像
terrence386
发布2022-07-15 10:53:01
9410
发布2022-07-15 10:53:01
举报
文章被收录于专栏:JavaScript高级程序设计

Node中的streams流

streams流是Node中的最好的特性之一。它在我们的开发过程当中可以帮助我们做很多事情。比如通过流的方式梳理大量数据,或者帮我们分离应用程序。

和streams流相关的内容有哪些呢?大致有这么几点:

  • 处理大量数据
  • 使用管道方法
  • 转换流
  • 读写流
  • 解耦I/O

处理无限量的数据

使用data事件,我们可以在消耗很少内存的情况下去处理一小块文件。

我们其实也可以处理无限量的数据,比如:我们可以从伪随机数生成器中读取字节数。

代码语言:javascript
复制
const rs = fs.createReadStream('/dev/urandom')
let size = 0
rs.on('data', (data) => {
  size += data.length
  console.log('文件大小:', size)
})

结果如下:

尽管程序一直在执行,并且文件的数量也是无限的,但是程序并没有崩溃。

可伸缩性是流的特性之一,大多数使用流编写的程序都可以很好的伸缩任何输入的大小。

flow模式(flow Mode)与pull-base模式(pull-based stream)

Node中的流可以处于流模式或者pull-based模式。当我们将数据添加到流,它就进入flow模式,这表示:只要有数据,就会调用data事件。

在上面的示例代码中,readStream刚刚创建的时候,并不处于flow模式,我们通过data事件将它放置到flow模式。

如果我们想停止它,我们可以调用可读流的暂停方法pause()。如果我们想重新开启它,我们可以调用resume()方法。

但是flow模式也可能会有问题,因为在某些情况下,即使流暂停,流也可能被传入数据的淹没,传入流可能不受pause()方法控制。

从流中提取数据的另一种方法是等待readable事件,然后不断调用流的read方法,直到返回null(即流终止符实体)。通过这种方式,我们可以从流中提取数据,并且可以在必要时停止提取。

换句话说,我们不需要告诉流暂停然后继续;我们可以根据需要启动或者停止它。

看个例子:

代码语言:javascript
复制
const fs = require('fs')
const rs = fs.createReadStream(__filename)
rs.on('readable', () => {
  let data = rs.read()
  while (data !== null) {
    console.log('读取的数据:', data)
    data = rs.read()
  }
})

rs.on('end', () => {
  console.log('没有数据了')
})

这个例子我们通过readable事件去判断是否有数据,而不是直接调用data事件。

当然,从流中提取数据更好的方法是通过pipe(管道)将我们的数据传输到我们创建的流中。这样一来管理内存的问题就可以在内部进行。

理解stream流的事件

所有流都继承自EventEmitter类并带有一系列不同的事件。了解一些我们经常用的事件,对于我们在处理流的过程当中非常有用。

第一,data事件。从可读流中读取新数据时触发。data数据作为事件处理程序的第一个参数。需要注意的是,与其他事件处理程序不同,附加数据侦听器会产生副作用。当连接第一个数据侦听器时,我们的流将被取消暂停。

第二,end事件。当可读流中没有数据时触发。

第三,finish事件。当可写流结束且所有挂起的写入都已完成时发出。

第四,close事件。通常在流完全关闭时发出,stream不一定会触发事件。

第五,puse事件。用于暂停一个可读流。大部分情况我们可以忽略这个方法。

第六,resume事件。用于重启一个可读流。

pipe方法

pipe方法用来将两个stream连接到一起。shell脚本中我们经常使用 | 管道符号来实现这个功能。通过这些方式,我们可以将多个管道连接在一起,更加方便的处理数据。

Streams的API 也为我们提供了pipe方法。每个可读都有一个pipe方法。

我们还是用一个例子来感受一下:

代码语言:javascript
复制
const zlib = require('zlib')
const map =  require('tar-map-stream')
const decompress = zlib.createGunzip()
const whoami = process.env.USER ||process.env.USERNAME

const convert = map((header)=>{
  header.uname = whoami
  header.mtime = new Date()
  header.name = header.name.replace('node-v0.1.100','edon-v0.0.0')
  return header
})
const compress = zlib.createGzip()

process.stdin.pipe(decompress)
  .pipe(convert)
  .pipe(compress)
  .pipe(process.stdout)

然后执行:

代码语言:javascript
复制
curl https://nodejs.org/dist/v0.1.100/node-v0.100.tar.gz | node read.js > edon.tar.gz

pipe方法将数据侦听绑定到streams流的源头,然后将接收到的数据导流到目标streams中。

当我们通过pipe将多个streams串联在一起时,我们是实际在告诉Node用这些流来解析数据。

使用pipe管道处理数据,比使用data方法相对来说更加安全一些,因为它可以自由的处理背压(backpressure),背压这个概念我们可以理解为内存管理。

比如,当快速生成数据的流可能会压到较慢的写入流时,需要使用缓冲压力策略来防止内存填满和进程崩溃。管道方法提供了这种背压。

上面的代码中,我们通过 | 管道符号将请求的数据导流到我们的 index.js 脚本中。

然后process.stdin标准I/O通过pipe方法一层一层的往下传递,最终通过重定向>存入edon.tar.gz文件中。

整个过程如下:

curl---> process.stdin---> decompress ---> content---> compress---> process.stdout---> endon.tar.gz

保持管道流的活力

通常情况下,当原始流通过管道连接到目标流时,目标流会随着原始流的结束而结束。

有时候我们希望在原始流结束之后额外再去做一些别的操作。这时候怎么办呢,我们来看一个例子:

名字随便起一个,start.js

代码语言:javascript
复制
const net = require('net')
const fs = require('fs')

net
  .createServer((socket) => {
    const content = fs.createReadStream(__filename)
    content.pipe(socket)
    content.on('end', () => {
      socket.end('\n========Footer=====\n')
    })
  })
  .listen(4000)

这时候我们执行

代码语言:javascript
复制
node start.js

然后我们发一个请求:

代码语言:javascript
复制
curl http://localhost:4000

会发现有报错信息

这是因为:当content可读流结束后,与之连接的socket流也就结束了。这时候,我们想要在socket后面添加内容就不可能了。

我们可以修改我们的代码如下:

代码语言:javascript
复制
const net = require('net')
const fs = require('fs')

net
  .createServer((socket) => {
    const content = fs.createReadStream(__filename)
    content.pipe(socket, { end: false })
    content.on('end', () => {
      socket.end('=========Footer========')
    })
  })
  .listen(4000)

这次我们在content.pipe中加入了第二个参数end:false。这告诉管道方法避免在源流结束时结束目标流,这时候我们的代码就不会报错。

相应的我们可以收到返回的信息:

生产中的管道流

pipe方法是streams流中一个非常重要的特性。它可以让我们把多个流组合成一行代码。

作为Node核心的一部分,它在进程运行时间不太重要的情况下非常有用。比如我们常用的cli工具。

但是不好的一点是它的错误处理。假如管道流中有一个流出现错误,它往往直接取消管道连接,然后将剩余的流进行销毁。这样一来,他们就不会泄露资源,但是有可能会导致内存泄露。

再来看一个例子:

代码语言:javascript
复制
const http = require('http')
const fs = require('fs')

const server = http.createServer((req, res) => {
  fs.createReadStream('veryBigData.file').pipe(res)
})

server.listen(4000)

这个服务在用管道给用户返回数据,因此很有可能会产出内存开销以及文件相关的信息泄露。

如果http响应在文件被完全传输给用户之前关闭,文件相关的一些信息肯定会泄露,以及文件流也会产生一些内存开销,文件流也会留在内存中,因为我们没有关闭它。

所以我们需要一些错误处理机制,能够在适当的时候销毁我们管道中的流。

这需要提到另外一个模块儿---pump(泵)。pump专门用来处理这些问题。

pump(泵)

我们再来尝试一个例子:

代码语言:javascript
复制
const fs = require('fs')
const http = require('http')
const pump = require('pump')

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream('veryBigData.file')
  pump(stream, res, done)
})

function done(err) {
  if (err) {
    return console.log('文件导流出现错误----', err)
  }

  console.log('文件导流成功')
}

server.listen(4000)

这时候,运行这个文件,发起请求,我们会发现它报错了。

每个传递到pump方法中的流都会被传给下一个流。如果上一个传入的是个函数,pump会在所有流都完成后执行这个方法。

pump内部有些附加的方法。比如关闭,错误处理以及在不影响其他流的情况下关闭另外一个流的方法。

如果其中一个流关闭,其他流将被销毁,并调用传递给pump的回调函数。

当然我们也可以手动去处理这些错误或者在数据关闭时销毁流,比如:

代码语言:javascript
复制

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream('veryBigData.file')
  stream.pipe(res)
  res.on('close',()=>{
    stream.destory()
  })
})

server.listen(4000)

但是通常情况下,使用pump(泵)更加方便,也更加安全。

pumpify

在编写管道的时候,尤其是作为一个单独的模块的时候。我们可能会希望将这些方法导出为外部的用户。这时候怎么办呢?

正如我们之前说的:管道由一系列传输流组成。我们将数据写入管道中的第一个流,然后数据通过它传输,直到写入最后一个流。

我们再看个例子:

代码语言:javascript
复制
const {createGzip} = require('zlib')
const {crateCipher, createCipheriv} = require('crypto')
const pumpify = require('pumpify')
const base64 = require('base64-encode-stream')

function pipeline(){
  const stream1 = createGzip()
  const stream2 = createCipheriv()
  const stream3 = base64()

  return pumpify(stream1,stream2,stream3)
}

这种写法有点类似Promisify

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JavaScript高级程序设计 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Node中的streams流
  • 处理无限量的数据
  • flow模式(flow Mode)与pull-base模式(pull-based stream)
  • 理解stream流的事件
  • pipe方法
  • 生产中的管道流
  • pump(泵)
  • pumpify
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档