摘要:在学习Node的过程中,Stream流是常用的东东,在了解怎么使用它的同时,我们应该要深入了解它的具体实现。今天的主要带大家来写一写可读流的具体实现,就过来,就过来,上码啦!
在写代码之前我们首先要整理下思路,我们要做什么,以及怎么来做。本篇文章以文件可读流为例,一个可读流大体分为四步:
var readStream=require('readStream.js'); var rs=new readStream('test.txt',{ flags:'r', //打开文件的模式 autoClose:true, //结束是否自动关闭 encoding:'utf8', //字符编码 highWaterMark:3, //每次读取的字节数 start:0, //从下标为多少的位置开始读取,默认以0开始 end:3, //结束下标位置 });
rs.on('data',function(data){ console.log(data); })
二、接下来定义readStream这个模块 1.因为我们以文件的可读流来做的,在此我们要引入一个文件模块。还有一个事件模块,并且要继承它,每一个可读流都是‘events’的一个实例。 首先我们先初始化参数:
var fs=require('fs'); var EventEmitter=require('events'); class readStream extends EventEmitter{ constructor(path,options){ super(); this.path=path; this.flags=opitons.flags||'r'; this.autoClose=opitons.autoClose||true; this.encoding=options.encoding||null; this.highWaterMark=options.highWaterMark||64*1024; this.start=options.start||0; this.end=options.end; this.pos=this.start; this.buffer=Buffer.alloc(this.highWaterMark); this.flowing=null; this.open(); } }
以上除了初始化传递进来的参数,还加了几个pos,buffer,open(),flowing,为什么要加这些呢?这些值是来做什么用的?我们在此做出解答:
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ this.emit('err'); } this.fd=fd; this.emit('open'); }); }
2.1在打开文件的时候,如果文件打开报错,我们除了要触发错误事件外,还要注意一个参数。autoClose是指在文件读取完毕或抛出错误后,自己关闭文件。 于是我们根据这个参数值,在现有的open方法中对抛错的情况做出优化。
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ if(autoClose){ if(typeof this.fd === 'number'){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } this.emit('err'); } this.fd=fd; this.emit('open'); }) }
3.打开文件后,并不是立马读取,而是要检查是否有data事件绑定监听! 对此,我们要在构造函数内检查如果添加了data的事件监听
class readStream extends EventEmitter{ constructor(path,options){ super(); ... this.on('newListener',(eventName,callback)=>{ if(eventName=='data'){ this.flowing=true; this.read(); } }) } }
完成以上步骤后,我们要做的就是读取文件内容啦,下面来自定义一个read方法:
read(){ let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark; fs.read(this.fd,this.buffer,0,howToLength,this.pos,(err,bytesBase)=>{ if(bytesBase>0){ this.pos+=bytesBase; this.buf=this.buffer.slice(0,bytesBase); let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString(); this.emit('data',data); if(this.end>this.pos){ this.emit('end'); if(autoClose){ if(typeof this.fd === 'number'){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } } if(flowing){ this.read(); } }else{ this.emit('err'); if(typeof this.fd === 'number'){ if(autoClose){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } } }) }
到此,一个read方法就写的差不多了,但是有个问题是要注意的,open方法是异步的,有可能出现调用read方法时,this.fd还没有值。为了避免这个错误,我们改写一下read方法。
read(){ if(typeof this.fd !== 'number'){ this.once('open',()=>this.read()); } ... }
这样的话,一个基础的readStream类才算写完整。我们是不是要考虑下,有没有什么可以优化的地方?细心的伙伴是不是发现有重复的代码? 对,就是文件的关闭,我们提出一个destory方法,用作关闭文件。
destory(){ if(typeof this.fd==='number'){ if(autoClose){ fs.close(this.fd,()=>{ this.emit('close'); }); return ; } this.emit('close'); } }
三、扩展 方法的调用介绍变量flowing时,我们有提到'暂停'方法pause(),'重启'方法resume()来改变flowing的值。我们加入到代码中。
rs.on('data',(data)=>{ console.log(data); this.pause(); }); setTimeout(()=>{ this.resume(); },3000)
2.这两个方法的调用也是一样简单:
pause(){ this.flowing=false; } resume(){ this.flowing=true; this.read(); }
OK,大功告成了,下面整理出完整代码
var fs=require('fs'); var EventEmitter=require('events'); class readStream extends EventEmitter{ constructor(path,options){ super(); this.path=path; this.flages=options.flages||'r'; this.autoClose=options.autoClose||true; this.encoding=options.encoding||null; this.highWaterMark=options.highWaterMark||64*1024; this.end=options.end; this.start=opitons.start||0; this.pos=this.start; this.flowing=false; this.buffer=Buffer.alloc(this.highWaterMark); this.open(); this.on('newListener',(eventName,callback){ if(eventName=='data'){ this.flowing=true; fs.read(); } }); open(){ fs.open(this.path,this.flags,(err,fd){ if(err){ if(this.autoClose){ this.destory(); } this.emit('err',err); return ; } this.fd=fd; this.emit('open'); }); } destory(){ if(typeof this.fd ='number'){ fs.close(this.fd,()=>{ this.emit('close'); }); return ; } this.emit('close'); } read(){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this.read()); } let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark; fs.read(this.fd,this.buffer,0,howToLenghth,this.pos,(err,bytesBase)=>{ if(bytesBase>0){ this.pos+=bytesBase; let buf=this.buffer.slice(0,bytesBase); let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString(); this.emit('data',data); if(this.pos>this.end){ this.emit('end'); this.destory(); } if(flowing){ this.read() } }else{ this.emit('err'); this.destory(); } }) } pause(){ this.flowing=false; } resume(){ this.flowing=true; this.read(); } } }
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句