前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2020-6-8-理解node中的stream的背压机制

2020-6-8-理解node中的stream的背压机制

作者头像
黄腾霄
发布2020-06-10 10:01:08
6380
发布2020-06-10 10:01:08
举报
文章被收录于专栏:黄腾霄的博客黄腾霄的博客

今天和大家聊一聊node中的stream的背压机制。


为什么要有流

在编写服务时,经常会需要涉及到文件或者数据压缩的问题。

使用合适的压缩算法能够有效减少请求文件的大小,从而减少网络中的数据传输量,提升响应速度。

假设我们采用最普通的方式处理一个文件的获取,压缩,发送这个过程,就会是如下所示的图形表示:

image-20200608104848227
image-20200608104848227

我们看到整个执行流程是串行的,所有耗时都会累加,导致整个过程耗时很长。

流的出现就是为了解决这个串行处理的问题。

如下图所示,我们将整个文件分成一个个小块,利用生产者消费者模式,上一个阶段的操作有一小部分完成后,

下一个阶段的操作就可以开始执行

image-20200608111829945
image-20200608111829945

这样从宏观上看,整个处理流程就可以并行执行,从而大大减少处理耗时。

image-20200608105448750
image-20200608105448750

背压问题

背压问题来源于生产者消费者模式中,消费者处理速度过慢。

比如说,我们下载过程,处理速度为3Mb/s,而压缩过程,处理速度为1Mb/s,这样的话,很快缓冲区队列就会形成堆积。

要么导致整个过程内存消耗增加,要么导致整个缓冲区慢,部分数据丢失。

什么是背压处理

背压处理可以理解为一个向上”喊话”的过程。

当压缩处理发现自己的缓冲区数据挤压超过阈值的时候,就对下载处理“喊话”,我忙不过来了,不要再发了。

下载处理收到消息就暂停向下发送数据。

image-20200608115814994
image-20200608115814994

而当缓存区处理至空时,又会重新通知下载处理,继续发送数据。

这样就能够实现,整个流的处理始终以保持以消费者速度进行消耗,不会引起重大积压。

pipe的生命周期

代码语言:javascript
复制
                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

从图中我们可以看到pipe对流的背压处理:

  • 将数据按照chunk进行划分,写入
  • 当chunk过大,或者队列忙碌时,暂停读取
  • 当队列为空时,继续读取数据。

参考文档:


本文会经常更新,请阅读原文: https://xinyuehtx.github.io/post/%E7%90%86%E8%A7%A3%E6%B5%81%E7%9A%84%E8%83%8C%E5%8E%8B%E6%9C%BA%E5%88%B6.html ,以避免陈旧错误知识的误导,同时有更好的阅读体验。

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。欢迎转载、使用、重新发布,但务必保留文章署名黄腾霄(包含链接: https://xinyuehtx.github.io ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请 与我联系

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-06-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要有流
  • 背压问题
    • 什么是背压处理
    • pipe的生命周期
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档