巧妙复制一个流

场景

实际业务中可能出现重复消费一个可读流的情况,比如在前置过滤器解析请求体,拿到body进行相关权限及身份认证;认证通过后框架或者后置过滤器再次解析请求体传递给业务上下文。因此,重复消费同一个流的需求并不奇葩,这类似于js上下文中通过 deep clone一个对象来操作这个对象副本,防止源数据被污染。

const Koa = require('koa');
const app = new Koa();

let parse = function(ctx){
    return new Promise((res)=>{
        let chunks = [],len  = 0, body = null;
        ctx.req.on('data',(chunk)=>{
            chunks.push(chunk)
            len += chunk.length
        });
        ctx.req.on('end',()=>{
            body = (Buffer.concat(chunks,len)).toString();
            res(body);
        });
    })
}
// 认证
app.use(async (ctx,next) => {
    let body = JSON.parse(decodeURIComponent(await parse(ctx)));
    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})
// 解析body体,传递给业务层
app.use(async (ctx,next) => {
    let body = await parse(ctx);
    ctx.postBody = body;
    await next();
})
app.use(async ctx => {
  ctx.body = 'Hello World\n';
  ctx.body += `post body: ${ctx.postBody}`;
});

app.listen(3000);

上述代码片段无法正常运行,请求无法得到响应。这是因为在前置过滤器的认证逻辑中消费了请求体,在第二级过滤器中就无法再次消费请求体,因此请求会阻塞。实际业务中,认证逻辑往往是与每个公司规范相关的,是一个“二方库”;而示例中的第二季过滤器则通常作为一个三方库存在,因此为了不影响第三方包消费请求体,必须在认证的二方包中保存 ctx.req 这个可读流的数据仍然存在,这就涉及到本文的主旨了。

实现

复制流并不像复制一个对象一样简单与直接,流的使用是一次性的,一旦一个可读流被消费(写入一个Writeable对象中),那么这个可读流就是不可再生的,无法再使用。可是通过一些简单的技巧可以再次复原一个可读流,不过这个复原出来的流虽然内容和之前的流相同,但却不是同一个对象了,因此这两个对象的属性及原型都不同,这往往会影响后续的使用,不过办法总是有的,且看下文。

实现一:可读流的“影分身之术”

可读流的“影分身之术”和鸣人的差不多,不过仅限于被克隆对象的 这一特性,即保证克隆出的流有着相同的数据。但是克隆出来的流却无法拥有原对象的其他属性,但我们可通过原型链继承的方式实现属性及方法的继承。

let Readable = require('stream').Readable;
let fs = require('fs');
let path = require('path');

class NewReadable extends Readable{
    constructor(originReadable){
        super();
        this.originReadable = originReadable;
        this.start();
    }

    start() {
        this.originReadable.on('data',(chunck)=>{
            this.push(chunck);
        });

        this.originReadable.on('end',()=>{
            this.push(null);
        });
        
        this.originReadable.on('error',(e)=>{
            this.push(e);
        });
    }

    // 作为Readable的实现类,必须实现_read函数,否则会throw Error
    _read(){
    }
}

app.use(async (ctx,next) => {
    let cloneReq = new NewReadable(ctx.req);
    let cloneReq2 = new NewReadable(ctx.req);
    // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上

    // 消费cloneReq,获取认证数据
    let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));

    // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新给ctx.req复制,留给后续过滤器消费
    ctx.req = cloneReq2;

    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})

点评: 这种影分身之术可以同时复制出多个可读流,同时需要针对原来的流重新进行赋值,并继承原有属性,这样才能不影响后续的重复消费。

实现二:懒人实现

stream模块有一个特殊的类,即 Transform。关于Transfrom的特性,我曾在 深入node之Transform 一文中详细介绍过,他拥有可读可写流双重特性,那么利用Transfrom可以快速简单的实现克隆。

首先,通过 pipe 函数将可读流导向两个 Transform流(之所以是两个,是因为需要在前置过滤器消费一个流,后续的过滤器消费第二个)。

let cloneReq = new Transform({
    highWaterMark: 10*1024*1024,
    transform: (chunk,encode,next)=>{
        next(null,chunk);
    }
});
let cloneReq2 = new Transform({
    highWaterMark: 10*1024*1024,
    transform: (chunk,encode,next)=>{
        next(null,chunk);
    }
});
ctx.req.pipe(cloneReq)
ctx.req.pipe(cloneReq2)

上述代码中,看似 ctx.req 流被消费(pipe)了两次,实际上 pipe 函数则可以看成 Readable和Writeable实现backpressure的一种“语法糖”实现,具体可通过 node中的Stream-Readable和Writeable解读 了解,因此得到的结果就是“ctx.req被消费了一次,可是数据却复制在cloneReq和cloneReq2这两个Transfrom对象的读缓冲区里,实现了clone”

其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

最后,在数据复制的同时,再给其中一个对象复制额外的属性即可:

// 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
cloneReq2.__proto__ = ctx.req;
// 此后重新给ctx.req复制,留给后续过滤器消费
ctx.req = cloneReq2;

至此,通过Transform实现clone已完成。完整的代码如下(最前置过滤器):

// 认证
app.use(async (ctx,next) => {
    // let cloneReq = new NewReadable(ctx.req);
    // let cloneReq2 = new NewReadable(ctx.req);
    let cloneReq = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    let cloneReq2 = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    ctx.req.pipe(cloneReq)
    ctx.req.pipe(cloneReq2)
    // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上

    // 消费cloneReq,获取认证数据
    let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));

    // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新给ctx.req复制,留给后续过滤器消费
    ctx.req = cloneReq2;

    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})

说明

  1. ctx.req执行两次pipe到对应cloneReq和cloneReq2,然后立即消费cloneReq对象,这样合理吗?如果源数据够大,pipe还未结束就在消费cloneReq,会不会有什么问题? 其实 pipe函数里面大多是异步操作,即针对 源和目的流做的一些流控措施。目的流使用的是cloneReq对象,该对象在实例化的过程中 transform函数直接通过调用next函数将接受到的数据传入到Transform对象的可读流缓存中,同时触发‘readable和data事件’。这样,我们在下文消费cloneReq对象也是通过“侦听data事件”实现的,因此即使ctx.req的数据仍没有被消费完,下文仍可以正常消费cloneReq对象。数据流仍然可以看做是从ctx.req --> cloneReq --> 消费。
  2. 使用Transform流实现clone 可读流的弊端: 上例中,Transfrom流的实例化传入了一个参数 highWaterMark,该参数在Transfrom中的作用 在 上文 深入node之Transform 中有过详解,即当Transfrom流的读缓冲大小 < highWaterMark时,Transfrom流就会将接收到的数据存储在读缓冲里,等待消费,同时执行 transfrom函数;否则什么都不做。 因此,当要clone的源内容大于highWaterMark时,就无法正常使用这种方式进行clone了,因为由于源内容>highWaterMark,在没有后续消费Transfrom流的情况下就不执行transfrom方法(当Transfrom流被消费时,Transfrom流的读缓冲就会变小,当其大小<highWaterMark时,又可以执行transfrom方法继续存储源数据),无法存储源文件内容。 所以设置一个合理的highWaterMark大小很重要,默认的highWaterMark为 16kB。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯Bugly的专栏

Android 平台 Native 代码的崩溃捕获机制及实现

一、背景 在Android平台,native crash一直是crash里的大头。native crash具有上下文不全、出错信息模糊、难以捕捉等特点,比jav...

1.2K70
来自专栏黑泽君的专栏

day58_BOS项目_10

之前的请假流程,是没有实际意义的,我们要使得我们流程变得有意义(有实际意义),需要在流程向下推进的过程中带着数据推进才有意义。如下图所示:

10440
来自专栏逸鹏说道

C# 温故而知新: 线程篇(四)

线程同步篇 (中):同步工具类的介绍 1 上篇回顾 2 继续介绍基元内核模式中的 monitor类 3 同步句柄:WaitHandle 4 EventW...

32060
来自专栏Java帮帮-微信公众号-技术文章全总结

Java并发学习2【面试+工作】

  关键字synchronized的作用是实现进程间的同步。它的工作是对同步的代码加锁,使得每一次,只能有一个线程进入同步块,从而保证线程间的安全性(即同步块每...

11420
来自专栏Java成神之路

【转】 Java 多线程之一

进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种资源和状态信息,包括...

11730
来自专栏JackieZheng

Spring实战——缓存

缓存 提到缓存,你能想到什么?一级缓存,二级缓存,web缓存,redis…… 你所能想到的各种包罗万象存在的打着缓存旗号存在的各种技术或者实现,无非都是宣扬缓...

224100
来自专栏王清培的专栏

.NET/ASP.NET MVC Controller 控制器(深入解析控制器运行原理)

阅读目录: 1.开篇介绍 2.ASP.NETMVC Controller 控制器的入口(Controller的执行流程) 3.ASP.NETMVC Contro...

21960
来自专栏阿杜的世界

使用SA分析内存溢出问题背景例子程序方式方法实践参考资料

在阅读《Java性能调优指南》一书的最后,书中介绍了Serviceability Agent,并给出了一些排查问题的示例,我感觉看书不够深刻,因此自己在macO...

15520
来自专栏木宛城主

Unity应用架构设计(10)——绕不开的协程和多线程(Part 2)

在上一回合谈到,客户端应用程序的所有操作都在主线程上进行,所以一些比较耗时的操作可以在异步线程上去进行,充分利用CPU的性能来达到程序的最佳性能。对于Unit...

408110
来自专栏一枝花算不算浪漫

页面静态化技术Freemarker技术的介绍及使用实例.

42960

扫码关注云+社区

领取腾讯云代金券