前言:今天下载了Node.js最新版代码,并为Node.js的TCP模块增加了SO_RESUEPORT的能力,本文介绍一下具体的实现,关于SO_RESUEPORT的知识可以参考之前的文章或者网上文章。
SO_RESUEPORT是操作系统内核提供的能力,所以第一步首先修改Libuv。考虑到操作系统兼容性的问题,目前只支持Linux系统,旧版Mac OS也支持相关属性但是效果不符合预期,新版Mac OS倒是支持,考虑到Node.js在几乎都是部署到Linux,所以可以先关注Linux内核。首先修改deps/uv/include/uv.h。
enum uv_tcp_flags {
UV_TCP_IPV6ONLY = 1,
// 支持SO_RESUEPORT flags
UV_TCP_REUSEPORT = 2};
接着修改deps/uv/src/unix/tcp.c。
#if defined(SO_REUSEPORT) && defined(__linux__)
on = 1;
if ((flags & UV_TCP_REUSEPORT) && setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)))
return UV__ERR(errno);
#endif
这里判断一下是否有两个宏,有的话才能使用SO_RESUEPORT。如果支持则通过setsockopt设置socket的SO_REUSEPORT标记,这是最核心的逻辑。
修改完底层的Libuv后,继续修改C++层,因为这是一个可选的属性,所以我们需要增加相关的逻辑。修改src/tcp_wrap.cc。首先导出一个新的常量
#if defined(SO_REUSEPORT) && defined(__linux__)
NODE_DEFINE_CONSTANT(constants, UV_TCP_REUSEPORT);#endif
在JS层可以通过判断是否导出了这个常量来判断系统是否支持SO_RESUEPORT。接着修改bind函数,因为我们再bind的时候可以设置SO_RESUEPORT。
template <typename T>void TCPWrap::Bind( const FunctionCallbackInfo<Value>& args,
int family,
std::function<int(const char* ip_address, int port, T* addr)> uv_ip_addr) {
TCPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
Environment* env = wrap->env();
node::Utf8Value ip_address(env->isolate(), args[0]);
int port;
unsigned int flags = 0;
if (!args[1]->Int32Value(env->context()).To(&port)) return;
// ipv6支持ipv6Only和SO_RESUEPORT
if (family == AF_INET6 &&
!args[2]->Uint32Value(env->context()).To(&flags)) {
return;
// ipv4之前是不支持任何标记的,这里需要加上这个逻辑,因为我们需要支持SO_RESUEPORT
} else if (family == AF_INET4 &&
!args[2]->Uint32Value(env->context()).To(&flags)) {
return;
}
T addr;
int err = uv_ip_addr(*ip_address, port, &addr);
if (err == 0) {
err = uv_tcp_bind(&wrap->handle_,
reinterpret_cast<const sockaddr*>(&addr),
flags);
}
args.GetReturnValue().Set(err);}
C++主要是完成透传flags的逻辑。
修改JS层是最复杂的地方,主要是为了应用层的兼容性问题。也就是说如果Node.js真的支持了SO_RESUEPORT,在某些平台不支持SO_RESUEPORT的情况下,我们如何能保证我们的代码能在各个平台上跑。简单来说,如果我们平台支持SO_RESUEPORT,我们可以开启多个子进程,然后分别执行以下代码。
const http = require('http');
http.createServer((req, res) => {
res.end('hello');}).listen({port: 8000, reuseport: true});
这时候,只需要修改一下Node.js的net.js,把reuseport标记传到C++层再传到Libuv就行,但是问题是,如果我们这样写代码,就无法在不支持SO_RESUEPORT的平台跑了,因为会导致重复监听端口的错误。所以为了兼容性,我想的方案是利用Cluster模块,目前Cluster模块支持轮询和共享两种模式,那么我们再加一种reuseport模式就好了,这样的好处是一旦我们平台不支持SO_RESUEPORT,我们可以降级到Node.js现在到模式。我们知道Cluster模块的原理有两种,一种是主进程监听,分发连接给子进程,另一种是主进程创建socket,通过文件描述符传递的方式传给子进程,所有的进程都是共享一个socket的。下面我们看看怎么做。首先修改lib/internal/cluster/primary.js。
// 增加这if的逻辑
if ((message.addressType === 4 ||
message.addressType === 6) &&
(message.flags & TCPConstants.UV_TCP_REUSEPORT)) {
handle = new ReusePort(key, address, message);
} else if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
handle = new SharedHandle(key, address, message);
} else {
handle = new RoundRobinHandle(key, address, message);
}
我们在queryServer函数里增加了一个if的逻辑。如果addressType是4或6说明是TCP协议,并且设置了UV_TCP_REUSEPORT(listen的时候传入),就会走到reuseport的逻辑,剩下的两个else是目前Node.js的逻辑。我们看看ReusePort.js做了什么。
'use strict';const assert = require('internal/assert');const net = require('net');const { constants: TCPConstants } = internalBinding('tcp_wrap');
module.exports = ReusePort;
function ReusePort(key, address, {port, addressType, fd, flags}) {
this.key = key;
this.workers = [];
this.handles = [];
this.list = [address, port, addressType, fd, flags];}
ReusePort.prototype.add = function(worker, send) {
assert(!this.workers.includes(worker));
const rval = net._createServerHandle(...this.list);
let errno;
let handle;
if (typeof rval === 'number')
errno = rval;
else
handle = rval;
this.workers.push(worker);
this.handles.push(handle);
send(errno, null, handle);};
ReusePort.prototype.remove = function(worker) {
const index = this.workers.indexOf(worker);
if (index === -1)
return false; // The worker wasn't sharing this handle.
this.workers.splice(index, 1);
this.handles[index].close();
this.handles.splice(index, 1);
return true;};
上面的代码我们只需要关注net._createServerHandle。在不能多个进程同时监听同一个端口的情况下,Node.js只会调net._createServerHandle创建一个socket,然后多个进程共享。而我们这里会给每个进程创建一个socket。这个socket就是在子进程调用queryServer的时候返回给子进程的。剩下的逻辑我们暂时不用关注。最后看一下_createServerHandle的逻辑。
const handle = new TCP(TCPConstants.SERVER);if (addressType === 6) { err = handle.bind6(address, port, flags);} else { err = handle.bind(address, port, flags || 0);}
_createServerHandle的逻辑是创建一个socket并且给socket绑定IP和端口,我们看到这里会给C++层传入flags,C++层就会传到LIbuv了,这样我们就完成了整个过程,整体的流程如下。
1 子进程执行listen的时候,传入reuseport为true
2 子进程通过进程间通信请求主进程
3 主进程返回一个新的socket并绑定到对应的地址
4 子进程执行listen启动服务器。
接下来我们看看如何使用,首先创建一个server.js。
const cluster = require('cluster');const os = require('os');const http = require('http');const cpus = os.cpus().length;
if (cluster.isPrimary) { const map = {};
for (let i = 0; i < cpus; i++) {
const worker = cluster.fork();
map[worker.process.pid] = 0;
worker.on('message', (pid) => {
map[pid]++;
});
}
process.on('SIGINT', () => {
console.log(map);
});} else { http.createServer((req, res) => {
process.send(process.pid);
res.end('hello');
}) .listen({reuseport: true, port: 8000});}
再创建一个客户端client.js
const http = require('http');
function connect() {
setTimeout(() => {
http.get('http://localhost:8000/', (res) => {
console.log(res.statusCode);
connect();
});
}, 50);}connect();
客户端串行访问服务器,我们看到使用方式和目前Node.js的Cluster使用一样。即使我们把reuseport改成false或者其他平台跑也没问题,效果如下
我们看到在reuseport的情况下,负载还是挺均衡的。
后记:目前是通过listen的时候传入参数去控制是否开启SO_RESUEPORT的,后续可以增加通过设置cluster.schedulingPolicy的方式,和目前共享、轮询模式对齐,考虑到Cluster模块不是必须,因为我们可以直接用子进程模块监听同一个端口。所以通过listen函数去控制是非常必要的。目前通过修改Node.js内核大概体验了一下SO_RESUEPORT,后续review和改进一下代码。有兴趣的同学可以参考和编译运行,如果你有想法欢迎交流。
仓库:
https://github.com/theanarkh/node的add-reuseport分支
改动:
https://github.com/theanarkh/node/pull/1
Libuv pr:
https://github.com/libuv/libuv/pull/3198