前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >原创:带你从零看清Node源码createServer和负载均衡整个过程

原创:带你从零看清Node源码createServer和负载均衡整个过程

作者头像
theanarkh
发布2020-02-25 15:17:52
1.1K0
发布2020-02-25 15:17:52
举报
文章被收录于专栏:原创分享原创分享

以下文章来源于前端巅峰 ,作者Peter 谭金杰

写在开头:

作为一名曾经重度使用Node.js作为即时通讯客户端接入层的开发人员,无法避免调试V8,配合开发addon。于是对Node.js源码产生了很大的兴趣~

顺便吐槽一句,Node的内存控制,由于是自动回收,我之前做的产品是20万人超级群的IM产品,像一秒钟1000条消息,持续时间长了内存和CPU占用还是会有一些问题

之前写过cluster模块源码分析、PM2原理等,感觉兴趣的可以去公众号翻一翻

Node.js的js部分源码基本看得差不多了,今天写一个createServer过程给大家,对于不怎么熟悉Node.js源码的朋友来说,可能是一个不错的开始,源码在gitHub上有,直接克隆即可,最近一直比较忙,公司和业余的工作也是,所以原创比较少。


原生Node.js创建一个基本的服务:

代码语言:javascript
复制
var http = require('http');
http.createServer(function (request, response) {
// 发送 HTTP 头部 
// HTTP 状态值: 200 : OK
// 内容类型: text/plain
response.writeHead(200, {'Content-Type': 'text/plain'});
// 发送响应数据 "Hello World"
response.end('Hello World\n');
}).listen(8888);
// 终端打印如下信息
console.log('Server running at http://127.0.0.1:8888/');

我们目前只分析Node.js源码的js部分的

首先找到Node.js源码的lib文件夹

然后找到http.js文件

发现createServer真正返回的是new Server,而 Server来自_http_server

于是找到同目录下的_http_server.js文件,发现整个文件有800行的样子,全局搜索Server找到函数

代码语言:javascript
复制
function Server(options, requestListener) {
if (!(this instanceof Server)) return new Server(options, requestListener);
if (typeof options === 'function') {
    requestListener = options;
    options = {};
  } else if (options == null || typeof options === 'object') {
    options = { ...options };
  } else {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options);
  }
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
  net.Server.call(this, { allowHalfOpen: true });
if (requestListener) {
this.on('request', requestListener);
  }

createServer函数解析:

  • 参数控制有点像redux源码里的initState和reducer,根据传入类型不同,做响应的处理
  • this.on('request', requestListener);}
  • 每次有请求,就会调用requestListener这个回调函数
  • 至于IncomingMessage和ServerResponse,请求是流,响应也是流,请求是可读流,响应是可写流,当时写那个静态资源服务器时候有提到过
  • 那么怎么可以链式调用?有人可能会有疑惑。Node.js源码遵循commonJs规范,大都挂载在prototype上,所以函数开头有,就是确保可以链式调用
代码语言:javascript
复制
if (!(this instanceof Server)) 
return new Server(options, requestListener);

上面已经将onrequest事件触发回调函数讲清楚了,那么链式调用listen方法,监听端口是怎么回事呢?

传统的链式调用,像JQ源码是return this , 手动实现A+规范的Promise则是返回一个全新的Promise,然后Promise原型上有then方法,于是可以链式调用


怎么实现.listen链式调用,重点在这行代码:

代码语言:javascript
复制
 net.Server.call(this, { allowHalfOpen: true });

allowHalfOpen实验结论: 这里TCP的知识不再做过度的讲解

代码语言:javascript
复制
(1)allowHalfOpen为true,一端发送FIN报文:
进程结束了,那么肯定会发送FIN报文;
进程未结束,不会发送FIN报文
(2)allowHalfOpen为false,一端发送FIN报文:
进程结束了,肯定发送FIN报文;
进程未结束,也会发送FIN报文;

于是找到net.js文件模块中的Server函数

代码语言:javascript
复制
function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  EventEmitter.call(this);

  if (typeof options === 'function') {
    connectionListener = options;
    options = {};
    this.on('connection', connectionListener);
  } else if (options == null || typeof options === 'object') {
    options = { ...options };

    if (typeof connectionListener === 'function') {
      this.on('connection', connectionListener);
    }
  } else {
    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
  }

  this._connections = 0;

  Object.defineProperty(this, 'connections', {
    get: deprecate(() => {

      if (this._usingWorkers) {
        return null;
      }
      return this._connections;
    }, 'Server.connections property is deprecated. ' +
       'Use Server.getConnections method instead.', 'DEP0020'),
    set: deprecate((val) => (this._connections = val),
                   'Server.connections property is deprecated.',
                   'DEP0020'),
    configurable: true, enumerable: false
  });

  this[async_id_symbol] = -1;
  this._handle = null;
  this._usingWorkers = false;
  this._workers = [];
  this._unref = false;

  this.allowHalfOpen = options.allowHalfOpen || false;
  this.pauseOnConnect = !!options.pauseOnConnect;
}

这里巧妙的通过.call调用net模块Server函数,保证了this指向一致

this._handle = null 这里是因为Node.js考虑到多进程问题,所以会hack掉这个属性,因为.listen方法最终会调用_handle中的方法,多个进程只会启动一个真正进程监听端口,然后负责分发给不同进程,这个后面会讲

Node.js源码的几个特色:

  1. 遵循conmonjs规范,很多方法挂载到prototype上了
  2. 很多object.definepropoty数据劫持
  3. this指向的修改,配合第一个进行链式调用
  4. 自带自定义事件模块,很多内置的函数都继承或通过Object.setPrototypeOf去封装了一些自定义事件
  5. 代码模块互相依赖比较多,一个.listen过程就很麻烦,初学代码者很容易睡着
  6. 源码学习,本就枯燥。没什么好说的了

我在net.js文件模块中发现了一个原型上.listen的方法:

代码语言:javascript
复制
Server.prototype.listen = function(...args) {
  const normalized = normalizeArgs(args);
  var options = normalized[0];
  const cb = normalized[1];

  if (this._handle) {
    throw new ERR_SERVER_ALREADY_LISTEN();
  }

  if (cb !== null) {
    this.once('listening', cb);
  }
  const backlogFromArgs =
    // (handle, backlog) or (path, backlog) or (port, backlog)
    toNumber(args.length > 1 && args[1]) ||
    toNumber(args.length > 2 && args[2]);  // (port, host, backlog)

  options = options._handle || options.handle || options;
  const flags = getFlags(options.ipv6Only);
  // (handle[, backlog][, cb]) where handle is an object with a handle
  if (options instanceof TCP) {
    this._handle = options;
    this[async_id_symbol] = this._handle.getAsyncId();
    listenInCluster(this, null, -1, -1, backlogFromArgs);
    return this;
  }
  // (handle[, backlog][, cb]) where handle is an object with a fd
  if (typeof options.fd === 'number' && options.fd >= 0) {
    listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
    return this;
  }

  // ([port][, host][, backlog][, cb]) where port is omitted,
  // that is, listen(), listen(null), listen(cb), or listen(null, cb)
  // or (options[, cb]) where options.port is explicitly set as undefined or
  // null, bind to an arbitrary unused port
  if (args.length === 0 || typeof args[0] === 'function' ||
      (typeof options.port === 'undefined' && 'port' in options) ||
      options.port === null) {
    options.port = 0;
  }
  // ([port][, host][, backlog][, cb]) where port is specified
  // or (options[, cb]) where options.port is specified
  // or if options.port is normalized as 0 before
  var backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    if (!isLegalPort(options.port)) {
      throw new ERR_SOCKET_BAD_PORT(options.port);
    }
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }

  // (path[, backlog][, cb]) or (options[, cb])
  // where path or options.path is a UNIX domain socket or Windows pipe
  if (options.path && isPipeName(options.path)) {
    var pipeName = this._pipeName = options.path;
    backlog = options.backlog || backlogFromArgs;
    listenInCluster(this, pipeName, -1, -1,
                    backlog, undefined, options.exclusive);

    if (!this._handle) {
      // Failed and an error shall be emitted in the next tick.
      // Therefore, we directly return.
      return this;
    }

    let mode = 0;
    if (options.readableAll === true)
      mode |= PipeConstants.UV_READABLE;
    if (options.writableAll === true)
      mode |= PipeConstants.UV_WRITABLE;
    if (mode !== 0) {
      const err = this._handle.fchmod(mode);
      if (err) {
        this._handle.close();
        this._handle = null;
        throw errnoException(err, 'uv_pipe_chmod');
      }
    }
    return this;
  }

  if (!(('port' in options) || ('path' in options))) {
    throw new ERR_INVALID_ARG_VALUE('options', options,
                                    'must have the property "port" or "path"');
  }

  throw new ERR_INVALID_OPT_VALUE('options', inspect(options));
};

这个就是我们要找的listen方法,可是里面很多ipv4和ipv6的处理,最重要的方法是listenInCluster


这个函数需要好好看一下,只有几十行

代码语言:javascript
复制
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      var ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    // Reuse master's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

如果是主进程,那么就直接调用_.listen2方法了

代码语言:javascript
复制
Server.prototype._listen2 = setupListenHandle;

找到setupListenHandle函数

代码语言:javascript
复制
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd);

  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
    debug('setupListenHandle: create a handle');

    var rval = null;

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);

      if (typeof rval === 'number') {
        rval = null;
        address = DEFAULT_IPV4_ADDR;
        addressType = 4;
      } else {
        address = DEFAULT_IPV6_ADDR;
        addressType = 6;
      }
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);

    if (typeof rval === 'number') {
      var error = uvExceptionWithHostPort(rval, 'listen', address, port);
      process.nextTick(emitErrorNT, this, error);
      return;
    }
    this._handle = rval;
  }

  this[async_id_symbol] = getNewAsyncId(this._handle);
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511);

  if (err) {
    var ex = uvExceptionWithHostPort(err, 'listen', address, port);
    this._handle.close();
    this._handle = null;
    defaultTriggerAsyncIdScope(this[async_id_symbol],
                               process.nextTick,
                               emitErrorNT,
                               this,
                               ex);
    return;
  }

  // Generate connection key, this should be unique to the connection
  this._connectionKey = addressType + ':' + address + ':' + port;

  // Unref the handle if the server was unref'ed prior to listening
  if (this._unref)
    this.unref();

  defaultTriggerAsyncIdScope(this[async_id_symbol],
                             process.nextTick,
                             emitListeningNT,
                             this);
}

里面的createServerHandle是重点

代码语言:javascript
复制
function createServerHandle(address, port, addressType, fd, flags) {
  var err = 0;
  // Assign handle in listen, and clean up if bind or listen fails
  var handle;

  var isTCP = false;
  if (typeof fd === 'number' && fd >= 0) {
    try {
      handle = createHandle(fd, true);
    } catch (e) {
      // Not a fd we can listen on.  This will trigger an error.
      debug('listen invalid fd=%d:', fd, e.message);
      return UV_EINVAL;
    }

    err = handle.open(fd);
    if (err)
      return err;

    assert(!address && !port);
  } else if (port === -1 && addressType === -1) {
    handle = new Pipe(PipeConstants.SERVER);
    if (process.platform === 'win32') {
      var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
      if (!Number.isNaN(instances)) {
        handle.setPendingInstances(instances);
      }
    }
  } else {
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
    debug('bind to', address || 'any');
    if (!address) {
      // Try binding to ipv6 first
      err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
      if (err) {
        handle.close();
        // Fallback to ipv4
        return createServerHandle(DEFAULT_IPV4_ADDR, port);
      }
    } else if (addressType === 6) {
      err = handle.bind6(address, port, flags);
    } else {
      err = handle.bind(address, port);
    }
  }

  if (err) {
    handle.close();
    return err;
  }

  return handle;
}

已经可以看到TCP了,离真正的绑定监听端口,更近了一步

最终通过下面的方法绑定监听端口

代码语言:javascript
复制
 handle.bind6(address, port, flags);
 或者
 handle.bind(address, port);

首选ipv6绑定,是因为ipv6可以接受到ipv4的套接字,而ipv4不可以接受ipv6的套接字,当然也有方法可以接收,就是麻烦了一点


上面的内容,请你认真看,因为下面会更复杂,设计到Node.js的多进程负载均衡原理

如果不是主进程,就调用cluster._getServer,找到cluster源码

代码语言:javascript
复制
'use strict';

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);

找到_getServer函数源码

代码语言:javascript
复制
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === 'string' &&
      process.platform !== 'win32')
    address = path.resolve(address);

  const indexesKey = [address,
                      options.port,
                      options.addressType,
                      options.fd ].join(':');

  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  indexes.set(indexesKey, index);

  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once('listening', () => {
    cluster.worker.state = 'listening';
    const address = obj.address();
    message.act = 'listening';
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

我们之前传入了三个参数给它,分别是

server,serverQuery,listenOnMasterHandle


这里是比较复杂的,曾经我也在这里迷茫过一段时间,但是想着还是看下去吧。坚持下,大家如果看到这里看不下去了,先休息下,保存着。后面等心情平复了再静下来接下去看


首先我们传入了Server、serverQuery和cb(回调函数listenOnMasterHandle),整个cluster模块的_getServer中最重要的就是:

代码语言:javascript
复制
if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

首先我们会先获取server上的data数据,然后调用send函数

代码语言:javascript
复制
function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

send函数调用的是cluster模块的utills文件内的函数,传入了一个默认值process

代码语言:javascript
复制

function sendHelper(proc, message, handle, cb) {
  if (!proc.connected)
    return false;

  // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
  message = { cmd: 'NODE_CLUSTER', ...message, seq };

  if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

这里要看清楚,我们调用sendHelper传入的第三个参数是null !!!

那么主进程返回也是null

代码语言:javascript
复制
send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

所以我们会进入rr函数调用的这个判断,这里调用rr传入的cb就是在net.js模块定义的listenOnMasterHandle函数


Node.js的负载均衡算法是轮询,官方给出的解释是简单粗暴效率高

上面的sendHelper函数就是做到了这点,每次+1

代码语言:javascript
复制
 if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
代码语言:javascript
复制
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      Object.assign(out, message.sockname);

    return 0;
  }

  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it's backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle);
}

此时的handle已经被重写,listen方法调用会返回0,不会再占用端口了。所以这样Node.js多个进程也只是一个进程监听端口而已

此时的cb还是net.js模块的setupListenHandle即 - _listen2方法。

官方的注释:

代码语言:javascript
复制
Faux handle. Mimics a TCPWrap with just enough fidelity to get away

仿句柄。以足够的保真度来模拟TCPWrap

花了一晚上整理,之前还有一些像cluster模块源码、pm2负载均衡原理等,有兴趣的可以翻一翻。觉得写得不错的可以点个在看,谢谢。时间匆忙,如果有写得不对的地方可以指出。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档