专栏首页YuanXinNodeJS模块研究 - cluster

NodeJS模块研究 - cluster

Nodejs 提供了 cluster 来支持服务集群的扩展,提高多核 CPU 的利用效率,实现负载均衡,最大程度利用机器性能。本文从以下几个方面介绍 cluster 的 API 和用法:

  • cluster 启动 HTTP 服务器
  • 如何进行广播?
  • 如何实现状态共享?
  • 如何处理进程退出?
  • 更多进程控制方法:心跳保活、自动重启、负载检测

cluster 启动 HTTP 服务器

为了方便测试,全局安装 autocannon:

npm install -g autocannon

不借助 cluster,编写一个简单的 http 服务器:

const http = require("http");

http.createServer((req, res) => {
    // 模拟cpu计算
    for (let i = 0; i < 100000; ++i) {}
    res.statusCode = 200;
    res.end("hello world!");
}).listen(4000);

借助 autocannon 开启 1000 个连接,每个连接的请求次数为 10 次,压测结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor

┌─────────┬──────┬──────┬────────┬────────┬──────────┬───────────┬────────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg      │ Stdev     │ Max        │
├─────────┼──────┼──────┼────────┼────────┼──────────┼───────────┼────────────┤
│ Latency │ 0 ms │ 0 ms │ 636 ms │ 650 ms │ 62.48 ms │ 197.51 ms │ 2928.64 ms │
└─────────┴──────┴──────┴────────┴────────┴──────────┴───────────┴────────────┘
┌───────────┬─────────┬─────────┬─────────┬─────────┬──────────┬────────┬─────────┐
│ Stat      │ 1%      │ 2.5%    │ 50%     │ 97.5%   │ Avg      │ Stdev  │ Min     │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Req/Sec   │ 13095   │ 13095   │ 15911   │ 16303   │ 15558.91 │ 901.48 │ 13092   │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Bytes/Sec │ 1.47 MB │ 1.47 MB │ 1.78 MB │ 1.83 MB │ 1.74 MB  │ 101 kB │ 1.47 MB │
└───────────┴─────────┴─────────┴─────────┴─────────┴──────────┴────────┴─────────┘

Req/Bytes counts sampled once per second.

171k requests in 11.17s, 19.2 MB read
50 errors (0 timeouts)

然后用 cluster 模块来启动一个利用多核的 http 服务器,代码如下:

const cluster = require("cluster");
const http = require("http");
const os = require("os");

if (cluster.isMaster) {
    const cpuNum = os.cpus().length;
    for (let i = 0; i < cpuNum; ++i) {
        cluster.fork();
    }
} else {
    runServer();
}

function runServer() {
    http.createServer((req, res) => {
        for (let i = 0; i < 100000; ++i) {}
        res.statusCode = 200;
        res.end("hello world!");
    }).listen(4000);
}

同样利用 autocannon 进行测试,结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor

┌─────────┬──────┬──────┬────────┬────────┬─────────┬──────────┬──────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg     │ Stdev    │ Max      │
├─────────┼──────┼──────┼────────┼────────┼─────────┼──────────┼──────────┤
│ Latency │ 0 ms │ 0 ms │ 113 ms │ 125 ms │ 11.5 ms │ 37.37 ms │ 807.5 ms │
└─────────┴──────┴──────┴────────┴────────┴─────────┴──────────┴──────────┘
┌───────────┬────────┬────────┬─────────┬─────────┬─────────┬──────────┬────────┐
│ Stat      │ 1%     │ 2.5%   │ 50%     │ 97.5%   │ Avg     │ Stdev    │ Min    │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Req/Sec   │ 43711  │ 43711  │ 97023   │ 108671  │ 90811.2 │ 16898.34 │ 43710  │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Bytes/Sec │ 4.9 MB │ 4.9 MB │ 10.9 MB │ 12.2 MB │ 10.2 MB │ 1.89 MB  │ 4.9 MB │
└───────────┴────────┴────────┴─────────┴─────────┴─────────┴──────────┴────────┘

Req/Bytes counts sampled once per second.

908k requests in 10.7s, 102 MB read

可以看到,错误请求从 50 降低到 0,最长请求延迟从 2.9s 降低到了 0.8s,平均请求量从 1.5w 提升到了 9w,平均下载量从 1.74MB 提升到了 10.2MB。而本机的os.cpus().length返回的结果是 12,提升非常稳定,和 cpu 核数基本成正比。

从上面的实践也看到,从 cluster 开启的子进程总数量最好和 cpu 数量一样。

如何进行广播?

广播需要父子进程之间进行通信,多用于消息下发、数据共享。cluster 是基于 child_process 模块的,所以通信的做法和 child_process 区别不大。

在主进程中, cluster.workders 是个哈希表,可以遍历得到所有工作进程。如下所示,给所有的工作进程广播消息:

if (cluster.isMaster) {
    for (let i = 0; i < os.cpus().length; ++i) {
        cluster.fork();
    }
    // 给工作进程广播消息
    for (const id in cluster.workers) {
        cluster.workers[id].send({
            data: "msg"
        });
    }
} else if (cluster.isWorker) {
    // 工作进程接受到消息
    process.on("message", msg => {
        console.log("msg is", msg);
    });
}

如何实现状态共享?

在上一个例子中,看到了借助 cluster.workers 和事件机制,来进行消息广播。但由于集群的每个节点是“分散”,所以对于有状态的服务应该想办法解决“状态共享”这个问题。

例如有需要我们进行总访问量统计的需求,并且将当前的访问量返回给客户端。由于每个进程都承载了一部分访问,工作进程接收到请求的时候,需要向主进程上报;工作进程接收到上报,更新访问总量,并且广播给各个工作进程。这就是一个完整的消息上报 => 状态更新 => 消息广播的过程。

按照上面的思路,首先封装工作进程的 http 逻辑,如下所示:

function runServer() {
    let visitTotal = 0;
    // 接收主进程的广播
    process.on("message", msg => {
        if (msg.tag === "broadcast") visitTotal = msg.visitTotal;
    });

    http.createServer((req, res) => {
        // 消息上报给主进程
        process.send({
            tag: "report"
        });
        res.statusCode = 200;
        res.end(`visit total times is ${visitTotal + 1}`);
    }).listen(4000);
}

是的,就是通过传递消息上的一个字段,来标识是工作进程上报的消息还是主进程广播的消息。给主进程用的 broadcast() 函数如下:

function broadcast(workers, data) {
    for (const id in workers) {
        // 给工作进程广播消息
        workers[id].send({
            tag: "broadcast",
            ...data
        });
    }
}

最后,主进程中需要为工作进程添加message事件的监听器,这样才能收到工作进程的消息,并且更新保存在主进程中的状态(visitTotal),完成广播。代码如下:

if (cluster.isMaster) {
    let visitTotal = 0; // 访客总人数

    const cpuNum = os.cpus().length;
    for (let i = 0; i < cpuNum; ++i) {
        cluster.fork();
    }

    for (const id in cluster.workers) {
        cluster.workers[id].on("message", msg => {
            if (msg.tag === "report") {
                ++visitTotal;
                broadcast(cluster.workers, { visitTotal });
            }
        });
    }
} else if (cluster.isWorker) {
    runServer();
}

其实,更常用的做法是专门准备一个服务器来进行统计,将服务单独部署。这里是为了深入理解和学习 cluster 模块。

如何处理进程退出?

cluster 模块中有 2 个 exit 事件:一个是 Worker 上的,仅用于工作进程中;另一个是主进程上,任何一个工作进程关闭都会触发。

在工作进程正常退出的时候,code 为 0,并且 Worker 上的 exitedAfterDisconnect 属性为 true。那么检测 code 和 exitedAfterDisconnect 属性,就能判断进程是否是异常退出。并且重新 fork 一个新的工作进程,来保持服务稳定运行。代码如下:

cluster.on("exit", (worker, code, signal) => {
    if (code || !worker.exitedAfterDisconnect) {
        console.log(`${worker.id} 崩溃,重启新的子进程`);
        cluster.fork();
    }
});

注意,exitedAfterDisconnect 属性在正常退出、调用 worker.kill() 或调用 worker.disconnect()时,均被设置为 true。因为调用 kill 和 disconnect 均为代码逻辑主动执行,属于程序的一部分。

更多进程控制方法:心跳保活、自动重启、负载检测

除了前面所讲的方法,进程控制的常见方法还有:心跳保活、自动重启、负载检测。

心跳保活:工作进程定时向主进程发送心跳包,主进程如果检测到长时间没有收到心跳包,要关闭对应的工作进程,并重启新的进程。

自动重启:给每个工作进程设置一个“生命周期”,例如 60mins。到时间后,通知主进程进行重启。

负载检测:工作进程和主进程可以定期检测 cpu 占用率、内存占用率、平均负载等指标,过高的话,则关闭重启对应工作进程。关于检测方法可以看这篇文章《NodeJS 模块研究 - os》

这些方法在 vemojs 中都有应用,具体可以看这篇文章:《VemoJS 源码拆解》

参考链接

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 谈谈promise/async/await的执行顺序与V8引擎的BUG

    细心的朋友肯定会发现前面第 6 步,如果async2函数是没有async关键词修饰的一个普通函数呢?

    心谭博客
  • LeetCode 684.冗余连接 - JavaScript

    输入一个图,该图由一个有着 N 个节点 (节点值不重复 1, 2, …, N) 的树及一条附加的边构成。附加的边的两个顶点包含在 1 到 N 中间,这条附加的边...

    心谭博客
  • VemoJS源码拆解

    按照命名,肯定是封装一些常用的方法。这里只提供了一个 cpuNum 的 getter 接口。

    心谭博客
  • Java 8 Stream Api 中的 skip 和 limit 操作

    Java 8 Stream API 中的skip()和limit()方法具有类似的作用。它们都是对流进行裁剪的中间方法。今天我们来探讨一下这两个方法。

    码农小胖哥
  • 响铃:爆雷=靴子落地,P2P或不再负重前行

    7月9日,2013年年初成立、交易额已经累计325亿元的钱爸爸,一下子变成了“囧爸爸”。“经侦部门介入调查”这一幕又开始频繁发生在P2P领域,看客们用“爆雷”来...

    曾响铃
  • 程序化广告出现的原因

    自互联网出现开始,从业者就不断对探索着如何进行商业变现,从最开始的文字超链广告到后面的Display Banner、关键字竞价,程序化广告,智能自动出价……

    GA小站
  • 成为java高级程序员需要掌握哪些

    精讲java
  • 转:成为Java高级程序员需要掌握哪些?

    1、Core Java,就是Java基础、JDK的类库,很多童鞋都会说,JDK我懂,但是懂还不足够,知其然还要知其所以然,JDK的源代码写的非常好,要经常查看,...

    技术zhai
  • 回昨天的美女面试官问题

    项勇
  • 信息安全不可或缺应用交付 还需安全交付

    从负载均衡、应用交付到交付安全,在短短的十数年里,交付领域的变化可谓日新月异,从软件到硬件,从4层负载到7层应用,而这两年,主流厂商又将应用交付的大旗指向了信息...

    企鹅号小编

扫码关注云+社区

领取腾讯云代金券