streams流是Node中的最好的特性之一。它在我们的开发过程当中可以帮助我们做很多事情。比如通过流的方式梳理大量数据,或者帮我们分离应用程序。
和streams流相关的内容有哪些呢?大致有这么几点:
使用data事件,我们可以在消耗很少内存的情况下去处理一小块文件。
我们其实也可以处理无限量的数据,比如:我们可以从伪随机数生成器中读取字节数。
const rs = fs.createReadStream('/dev/urandom')
let size = 0
rs.on('data', (data) => {
size += data.length
console.log('文件大小:', size)
})
结果如下:
尽管程序一直在执行,并且文件的数量也是无限的,但是程序并没有崩溃。
可伸缩性是流的特性之一,大多数使用流编写的程序都可以很好的伸缩任何输入的大小。
Node中的流可以处于流模式或者pull-based模式。当我们将数据添加到流,它就进入flow模式,这表示:只要有数据,就会调用data事件。
在上面的示例代码中,readStream刚刚创建的时候,并不处于flow模式,我们通过data事件将它放置到flow模式。
如果我们想停止它,我们可以调用可读流的暂停方法pause()。如果我们想重新开启它,我们可以调用resume()方法。
但是flow模式也可能会有问题,因为在某些情况下,即使流暂停,流也可能被传入数据的淹没,传入流可能不受pause()方法控制。
从流中提取数据的另一种方法是等待readable事件,然后不断调用流的read方法,直到返回null(即流终止符实体)。通过这种方式,我们可以从流中提取数据,并且可以在必要时停止提取。
换句话说,我们不需要告诉流暂停然后继续;我们可以根据需要启动或者停止它。
看个例子:
const fs = require('fs')
const rs = fs.createReadStream(__filename)
rs.on('readable', () => {
let data = rs.read()
while (data !== null) {
console.log('读取的数据:', data)
data = rs.read()
}
})
rs.on('end', () => {
console.log('没有数据了')
})
这个例子我们通过readable事件去判断是否有数据,而不是直接调用data事件。
当然,从流中提取数据更好的方法是通过pipe(管道)将我们的数据传输到我们创建的流中。这样一来管理内存的问题就可以在内部进行。
所有流都继承自EventEmitter类并带有一系列不同的事件。了解一些我们经常用的事件,对于我们在处理流的过程当中非常有用。
第一,data事件。从可读流中读取新数据时触发。data数据作为事件处理程序的第一个参数。需要注意的是,与其他事件处理程序不同,附加数据侦听器会产生副作用。当连接第一个数据侦听器时,我们的流将被取消暂停。
第二,end事件。当可读流中没有数据时触发。
第三,finish事件。当可写流结束且所有挂起的写入都已完成时发出。
第四,close事件。通常在流完全关闭时发出,stream不一定会触发事件。
第五,puse事件。用于暂停一个可读流。大部分情况我们可以忽略这个方法。
第六,resume事件。用于重启一个可读流。
pipe方法用来将两个stream连接到一起。shell脚本中我们经常使用 | 管道符号来实现这个功能。通过这些方式,我们可以将多个管道连接在一起,更加方便的处理数据。
Streams的API 也为我们提供了pipe方法。每个可读都有一个pipe方法。
我们还是用一个例子来感受一下:
const zlib = require('zlib')
const map = require('tar-map-stream')
const decompress = zlib.createGunzip()
const whoami = process.env.USER ||process.env.USERNAME
const convert = map((header)=>{
header.uname = whoami
header.mtime = new Date()
header.name = header.name.replace('node-v0.1.100','edon-v0.0.0')
return header
})
const compress = zlib.createGzip()
process.stdin.pipe(decompress)
.pipe(convert)
.pipe(compress)
.pipe(process.stdout)
然后执行:
curl https://nodejs.org/dist/v0.1.100/node-v0.100.tar.gz | node read.js > edon.tar.gz
pipe方法将数据侦听绑定到streams流的源头,然后将接收到的数据导流到目标streams中。
当我们通过pipe将多个streams串联在一起时,我们是实际在告诉Node用这些流来解析数据。
使用pipe管道处理数据,比使用data方法相对来说更加安全一些,因为它可以自由的处理背压(backpressure),背压这个概念我们可以理解为内存管理。
比如,当快速生成数据的流可能会压到较慢的写入流时,需要使用缓冲压力策略来防止内存填满和进程崩溃。管道方法提供了这种背压。
上面的代码中,我们通过 | 管道符号将请求的数据导流到我们的 index.js 脚本中。
然后process.stdin标准I/O通过pipe方法一层一层的往下传递,最终通过重定向>
存入edon.tar.gz文件中。
整个过程如下:
curl---> process.stdin---> decompress ---> content---> compress---> process.stdout---> endon.tar.gz
保持管道流的活力
通常情况下,当原始流通过管道连接到目标流时,目标流会随着原始流的结束而结束。
有时候我们希望在原始流结束之后额外再去做一些别的操作。这时候怎么办呢,我们来看一个例子:
名字随便起一个,start.js
const net = require('net')
const fs = require('fs')
net
.createServer((socket) => {
const content = fs.createReadStream(__filename)
content.pipe(socket)
content.on('end', () => {
socket.end('\n========Footer=====\n')
})
})
.listen(4000)
这时候我们执行
node start.js
然后我们发一个请求:
curl http://localhost:4000
会发现有报错信息
这是因为:当content可读流结束后,与之连接的socket流也就结束了。这时候,我们想要在socket后面添加内容就不可能了。
我们可以修改我们的代码如下:
const net = require('net')
const fs = require('fs')
net
.createServer((socket) => {
const content = fs.createReadStream(__filename)
content.pipe(socket, { end: false })
content.on('end', () => {
socket.end('=========Footer========')
})
})
.listen(4000)
这次我们在content.pipe中加入了第二个参数end:false。这告诉管道方法避免在源流结束时结束目标流,这时候我们的代码就不会报错。
相应的我们可以收到返回的信息:
pipe方法是streams流中一个非常重要的特性。它可以让我们把多个流组合成一行代码。
作为Node核心的一部分,它在进程运行时间不太重要的情况下非常有用。比如我们常用的cli工具。
但是不好的一点是它的错误处理。假如管道流中有一个流出现错误,它往往直接取消管道连接,然后将剩余的流进行销毁。这样一来,他们就不会泄露资源,但是有可能会导致内存泄露。
再来看一个例子:
const http = require('http')
const fs = require('fs')
const server = http.createServer((req, res) => {
fs.createReadStream('veryBigData.file').pipe(res)
})
server.listen(4000)
这个服务在用管道给用户返回数据,因此很有可能会产出内存开销以及文件相关的信息泄露。
如果http响应在文件被完全传输给用户之前关闭,文件相关的一些信息肯定会泄露,以及文件流也会产生一些内存开销,文件流也会留在内存中,因为我们没有关闭它。
所以我们需要一些错误处理机制,能够在适当的时候销毁我们管道中的流。
这需要提到另外一个模块儿---pump(泵)。pump专门用来处理这些问题。
我们再来尝试一个例子:
const fs = require('fs')
const http = require('http')
const pump = require('pump')
const server = http.createServer((req, res) => {
const stream = fs.createReadStream('veryBigData.file')
pump(stream, res, done)
})
function done(err) {
if (err) {
return console.log('文件导流出现错误----', err)
}
console.log('文件导流成功')
}
server.listen(4000)
这时候,运行这个文件,发起请求,我们会发现它报错了。
每个传递到pump方法中的流都会被传给下一个流。如果上一个传入的是个函数,pump会在所有流都完成后执行这个方法。
pump内部有些附加的方法。比如关闭,错误处理以及在不影响其他流的情况下关闭另外一个流的方法。
如果其中一个流关闭,其他流将被销毁,并调用传递给pump的回调函数。
当然我们也可以手动去处理这些错误或者在数据关闭时销毁流,比如:
const server = http.createServer((req, res) => {
const stream = fs.createReadStream('veryBigData.file')
stream.pipe(res)
res.on('close',()=>{
stream.destory()
})
})
server.listen(4000)
但是通常情况下,使用pump(泵)更加方便,也更加安全。
在编写管道的时候,尤其是作为一个单独的模块的时候。我们可能会希望将这些方法导出为外部的用户。这时候怎么办呢?
正如我们之前说的:管道由一系列传输流组成。我们将数据写入管道中的第一个流,然后数据通过它传输,直到写入最后一个流。
我们再看个例子:
const {createGzip} = require('zlib')
const {crateCipher, createCipheriv} = require('crypto')
const pumpify = require('pumpify')
const base64 = require('base64-encode-stream')
function pipeline(){
const stream1 = createGzip()
const stream2 = createCipheriv()
const stream3 = base64()
return pumpify(stream1,stream2,stream3)
}
这种写法有点类似Promisify
本文分享自 JavaScript高级程序设计 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!