前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >nodejs源码分析之http Agent

nodejs源码分析之http Agent

作者头像
theanarkh
发布2021-04-22 10:57:43
8940
发布2021-04-22 10:57:43
举报
文章被收录于专栏:原创分享原创分享

Agent对TCP连接进行了池化管理。简单的情况下,客户端发送一个HTTP请求之前,首先建立一个TCP连接,收到响应后会立刻关闭TCP连接。但是我们知道TCP的三次握手是比较耗时的。所以如果我们能复用TCP连接,在一个TCP连接上发送多个HTTP请求和接收多个HTTP响应,那么在性能上面就会得到很大的提升。Agent的作用就是复用TCP连接。不过Agent的模式是在一个TCP连接上串行地发送请求和接收响应,不支持HTTP PipeLine模式。下面我们看一下Agent模块的具体实现。看它是如何实现TCP连接复用的。

代码语言:javascript
复制
1.  function Agent(options) {  
2.    if (!(this instanceof Agent))  
3.      return new Agent(options);  
4.    EventEmitter.call(this);  
5.    this.defaultPort = 80;  
6.    this.protocol = 'http:';  
7.    this.options = { ...options };  
8.    // path字段表示是本机的进程间通信时使用的路径,比如Unix域路径  
9.    this.options.path = null;  
10.    // socket个数达到阈值后,等待空闲socket的请求  
11.    this.requests = {};  
12.    // 正在使用的socket  
13.    this.sockets = {};  
14.    // 空闲socket  
15.    this.freeSockets = {};  
16.    // 空闲socket的存活时间  
17.    this.keepAliveMsecs = this.options.keepAliveMsecs || 1000;  
18.    /* 
19.      用完的socket是否放到空闲队列, 
20.        开启keepalive才会放到空闲队列, 
21.        不开启keepalive 
22.          还有等待socket的请求则复用socket 
23.          没有等待socket的请求则直接销毁socket 
24.    */  
25.    this.keepAlive = this.options.keepAlive || false;  
26.    // 最大的socket个数,包括正在使用的和空闲的socket  
27.    this.maxSockets = this.options.maxSockets 
28.                        || Agent.defaultMaxSockets;  
29.    // 最大的空闲socket个数  
30.    this.maxFreeSockets = this.options.maxFreeSockets || 256;  
31.  }

Agent维护了几个数据结构,分别是等待socket的请求、正在使用的socket、空闲socket。每一个数据结构是一个对象,对象的key是根据HTTP请求参数计算的。对象的值是一个队列。具体结构如图所示。

下面我们看一下Agent模块的具体实现。

1 key的计算

key的计算是池化管理的核心。正确地设计key的计算规则,才能更好地利用池化带来的好处。

代码语言:javascript
复制
1.  // 一个请求对应的key  
2.  Agent.prototype.getName = function getName(options) {  
3.    let name = options.host || 'localhost'; 
4.    name += ':';  
5.    if (options.port)  
6.      name += options.port;  
7.    name += ':';  
8.    if (options.localAddress)  
9.      name += options.localAddress;  
10.    if (options.family === 4 || options.family === 6)  
11.      name += `:${options.family}`;  
12.    if (options.socketPath)  
13.      name += `:${options.socketPath}`; 
14.    return name;  
15.  };  

我们看到key由host、port、本地地址、地址簇类型、unix路径计算而来。所以不同的请求只有这些因子都一样的情况下才能复用连接。另外我们看到Agent支持Unix域。

2 创建一个socket

代码语言:javascript
复制
1.  function createSocket(req, options, cb) {  
2.    options = { ...options, ...this.options };  
3.    // 计算key
4.    const name = this.getName(options);  
5.    options._agentKey = name;  
6.    options.encoding = null;  
7.    let called = false;  
8.    // 创建socket完毕后执行的回调
9.    const oncreate = (err, s) => {  
10.      if (called)  
11.        return;  
12.      called = true;  
13.      if (err)  
14.        return cb(err);  
15.      if (!this.sockets[name]) {  
16.        this.sockets[name] = [];  
17.      }  
18.      // 插入正在使用的socket队列  
19.      this.sockets[name].push(s); 
20.       // 监听socket的一些事件,用于回收socket 
21.      installListeners(this, s, options); 
22.      // 有可用socket,通知调用方 
23.      cb(null, s);  
24.    };  
25.    // 创建一个新的socket,使用net.createConnection  
26.    const newSocket = this.createConnection(options, oncreate);  
27.    if (newSocket)  
28.      oncreate(null, newSocket);  
29.  }  
30.    
31.  function installListeners(agent, s, options) {  
32.    /*
33.      socket触发空闲事件的处理函数,告诉agent该socket空闲了,
34.      agent会回收该socket到空闲队列  
35.    */
36.    function onFree() {  
37.      agent.emit('free', s, options);  
38.    }  
39.    /* 
40.      监听socket空闲事件,调用方使用完socket后触发,
41.      通知agent socket用完了 
42.    */ 
43.    s.on('free', onFree);  
44.    
45.    function onClose(err) {  
46.      agent.removeSocket(s, options);  
47.    }  
48.    // socket关闭则agent会从socket队列中删除它  
49.    s.on('close', onClose);  
50.    
51.    function onRemove() {  
52.      agent.removeSocket(s, options);  
53.      s.removeListener('close', onClose);  
54.      s.removeListener('free', onFree);  
55.      s.removeListener('agentRemove', onRemove);  
56.    }  
57.    // agent被移除  
58.    s.on('agentRemove', onRemove);  
59.    
60.  }  

创建socket的主要逻辑如下

1 调用net模块创建一个socket(TCP或者Unix域),然后插入使用中的socket队列,最后通知调用方socket创建成功。

2 监听socket的close、free事件和agentRemove事件,触发时从队列中删除socket。

3 删除socket

代码语言:javascript
复制
1.  // 把socket从正在使用队列或者空闲队列中移出  
2.  function removeSocket(s, options) {  
3.    const name = this.getName(options);  
4.    const sets = [this.sockets];  
5.    /*
6.      socket不可写了,则有可能是存在空闲的队列中,
7.      所以需要遍历空闲队列,因为removeSocket只会在
8.      使用完socket或者socket关闭的时候被调用,前者只有在
9.      可写状态时会调用,后者是不可写的
10.    */
11.    if (!s.writable)  
12.      sets.push(this.freeSockets);  
13.    // 从队列中删除对应的socket  
14.    for (const sockets of sets) {  
15.      if (sockets[name]) {  
16.        const index = sockets[name].indexOf(s);  
17.        if (index !== -1) {  
18.          sockets[name].splice(index, 1);  
19.          // Don't leak  
20.          if (sockets[name].length === 0)  
21.            delete sockets[name];  
22.        }  
23.      }  
24.    }  
25.    /* 
26.      如果还有在等待socekt的请求,则创建socket去处理它, 
27.      因为socket数已经减一了,说明socket个数还没有达到阈值
28.      但是这里应该先判断是否还有空闲的socket,有则可以复用,
29.      没有则创建新的socket 
30.    */  
31.    if (this.requests[name] && this.requests[name].length) {  
32.      const req = this.requests[name][0];  
33.      const socketCreationHandler = handleSocketCreation(this, 
34.                                                              req,            
35.                                                              false);  
36.      this.createSocket(req, options, socketCreationHandler);  
37.    }  
38.  };  

前面已经分析过,Agent维护了两个socket队列,删除socket就是从这两个队列中找到对应的socket,然后移除它。移除后需要判断一下是否还有等待socket的请求队列,有的话就新建一个socket去处理它。因为移除了一个socket,就说明可以新增一个socket。

4 设置socket keepalive

当socket被使用完并且被插入空闲队列后,需要重新设置socket的keepalive值。等到超时会自动关闭socket。在一个socket上调用一次setKeepAlive就可以了,这里可能会导致多次调用setKeepAlive,不过也没有影响。

代码语言:javascript
复制
1.  function keepSocketAlive(socket) {  
2.    socket.setKeepAlive(true, this.keepAliveMsecs);  
3.    socket.unref();  
4.    return true;  
5.  };  

另外需要设置ref标记,防止该socket阻止事件循环的退出,因为该socket是空闲的,不应该影响事件循环的退出。

5 复用socket

代码语言:javascript
复制
1.  function reuseSocket(socket, req) {  
2.    req.reusedSocket = true;  
3.    socket.ref();  
4.  };  

重新使用该socket,需要修改ref标记,阻止事件循环退出,并标记请求使用的是复用socket。

6 销毁Agent

代码语言:javascript
复制
1.  function destroy() {  
2.    for (const set of [this.freeSockets, this.sockets]) {  
3.      for (const key of ObjectKeys(set)) {  
4.        for (const setName of set[key]) {  
5.          setName.destroy();  
6.        }  
7.      }  
8.    }  
9.  }; 

因为Agent本质上是一个socket池,销毁Agent即销毁池里维护的所有socket。

7 使用连接池

我们看一下如何使用Agent。

代码语言:javascript
复制
1.  function addRequest(req, options, port, localAddress) {  
2.    // 参数处理  
3.    if (typeof options === 'string') {  
4.      options = {  
5.        host: options,  
6.        port,  
7.        localAddress  
8.      };  
9.    }  
10.    
11.    options = { ...options, ...this.options };  
12.    if (options.socketPath)  
13.      options.path = options.socketPath;  
14.    
15.    if (!options.servername && options.servername !== '')  
16.      options.servername = calculateServerName(options, req);  
17.    // 拿到请求对应的key  
18.    const name = this.getName(options);  
19.    // 该key还没有在使用的socekt则初始化数据结构  
20.    if (!this.sockets[name]) {  
21.      this.sockets[name] = [];  
22.    }  
23.    // 该key对应的空闲socket列表  
24.    const freeLen = this.freeSockets[name] ? 
25.                      this.freeSockets[name].length : 0;  
26.    // 该key对应的所有socket个数  
27.    const sockLen = freeLen + this.sockets[name].length;  
28.    // 该key有对应的空闲socekt  
29.    if (freeLen) {    
30.      // 获取一个该key对应的空闲socket  
31.      const socket = this.freeSockets[name].shift();  
32.      // 取完了删除,防止内存泄漏  
33.      if (!this.freeSockets[name].length)  
34.        delete this.freeSockets[name];  
35.      // 设置ref标记,因为正在使用该socket  
36.      this.reuseSocket(socket, req);  
37.      // 设置请求对应的socket  
38.      setRequestSocket(this, req, socket);  
39.      // 插入正在使用的socket队列  
40.      this.sockets[name].push(socket);  
41.    } else if (sockLen < this.maxSockets) {   
42.      /* 
43.        如果该key没有对应的空闲socket并且使用的 
44.        socket个数还没有得到阈值,则继续创建 
45.      */  
46.      this.createSocket(req,
47.                          options, 
48.                          handleSocketCreation(this, req, true));  
49.    } else {  
50.      // 等待该key下有空闲的socket  
51.      if (!this.requests[name]) {  
52.        this.requests[name] = [];  
53.      }  
54.      this.requests[name].push(req);  
55.    }  
56.  }  

当我们需要发送一个HTTP请求的时候,我们可以通过Agent的addRequest方法把请求托管到Agent中,当有可用的socket时,Agent会通知我们。addRequest的代码很长,主要分为三种情况。

1 有空闲socket,则直接复用,并插入正在使用的socket队列中

我们主要看一下setRequestSocket函数

代码语言:javascript
复制
1.  function setRequestSocket(agent, req, socket) {  
2.    // 通知请求socket创建成功  
3.    req.onSocket(socket);  
4.    const agentTimeout = agent.options.timeout || 0;  
5.    if (req.timeout === undefined || req.timeout === agentTimeout) 
6.    {  
7.      return;  
8.    }  
9.    // 开启一个定时器,过期后触发timeout事件  
10.    socket.setTimeout(req.timeout);  
11.    /*
12.      监听响应事件,响应结束后需要重新设置超时时间,
13.      开启下一个请求的超时计算,否则会提前过期 
14.    */ 
15.    req.once('response', (res) => {  
16.      res.once('end', () => {  
17.        if (socket.timeout !== agentTimeout) {  
18.          socket.setTimeout(agentTimeout);  
19.        }  
20.      });  
21.    });  
22.  }  

setRequestSocket函数通过req.onSocket(socket)通知调用方有可用socket。然后如果请求设置了超时时间则设置socket的超时时间,即请求的超时时间。最后监听响应结束事件,重新设置超时时间。

2 没有空闲socket,但是使用的socket个数还没有达到阈值,则创建新的socket。

我们主要分析创建socket后的回调handleSocketCreation。

代码语言:javascript
复制
1.  function handleSocketCreation(agent, request, informRequest) {  
2.    return function handleSocketCreation_Inner(err, socket) {  
3.      if (err) {  
4.        process.nextTick(emitErrorNT, request, err);  
5.        return;  
6.      }  
7.      /* 
8.       是否需要直接通知请求方,这时候request不是来自等待
9.        socket的requests队列, 而是来自调用方,见addRequest 
10.      */  
11.      if (informRequest)  
12.        setRequestSocket(agent, request, socket);  
13.      else  
14.        /*
15.          不直接通知,先告诉agent有空闲的socket,
16.          agent会判断是否有正在等待socket的请求,有则处理  
17.         */
18.        socket.emit('free');  
19.    };  
20.  }  

3 不满足1,2,则把请求插入等待socket队列。

插入等待socket队列后,当有socket空闲时会触发free事件,我们看一下该事件的处理逻辑。

代码语言:javascript
复制
1.  // 监听socket空闲事件  
2.   this.on('free', (socket, options) => {  
3.     const name = this.getName(options);
4.     // socket还可写并且还有等待socket的请求,则复用socket  
5.     if (socket.writable &&  
6.         this.requests[name] && this.requests[name].length) {  
7.       // 拿到一个等待socket的请求,然后通知它有socket可用  
8.       const req = this.requests[name].shift();  
9.       setRequestSocket(this, req, socket);  
10.       // 没有等待socket的请求则删除,防止内存泄漏  
11.       if (this.requests[name].length === 0) {  
12.         // don't leak  
13.         delete this.requests[name];  
14.       }  
15.     } else {  
16.       // socket不可用写或者没有等待socket的请求了  
17.       const req = socket._httpMessage;  
18.       // socket可写并且请求设置了允许使用复用的socket  
19.       if (req &&  
20.           req.shouldKeepAlive &&  
21.           socket.writable &&  
22.           this.keepAlive) {  
23.         let freeSockets = this.freeSockets[name];  
24.         // 该key下当前的空闲socket个数  
25.         const freeLen = freeSockets ? freeSockets.length : 0;  
26.         let count = freeLen;  
27.         // 正在使用的socket个数  
28.         if (this.sockets[name])  
29.           count += this.sockets[name].length;  
30.         /*
31.             该key使用的socket个数达到阈值或者空闲socket达到阈值,
32.             则不复用socket,直接销毁socket  
33.          */
34.         if (count > this.maxSockets || 
             freeLen >= this.maxFreeSockets) {  
35.           socket.destroy();  
36.         } else if (this.keepSocketAlive(socket)) {   
37.           /*
38.              重新设置socket的存活时间,设置失败说明无法重新设置存活时
39.              间,则说明可能不支持复用  
40.            */
41.           freeSockets = freeSockets || [];  
42.           this.freeSockets[name] = freeSockets;  
43.           socket[async_id_symbol] = -1;  
44.           socket._httpMessage = null;  
45.           // 把socket从正在使用队列中移除  
46.           this.removeSocket(socket, options);  
47.           // 插入socket空闲队列  
48.           freeSockets.push(socket);  
49.         } else {  
50.           // 不复用则直接销毁  
51.           socket.destroy();  
52.         }  
53.       } else {  
54.         socket.destroy();  
55.       }  
56.     }  
57.   });  

当有socket空闲时,分为以下几种情况

1 如果有等待socket的请求,则直接复用socket。

2 如果没有等待socket的请求,允许复用并且socket个数没有达到阈值则插入空闲队列。

3 直接销毁

8 使用例子

下面我们从_http_client.js为例子看看如何使用agent。_http_client.js是对http客户端的封装,当我们使用nodejs发送一个请求的时候,就会使用_http_client.js的ClientRequest。

代码语言:javascript
复制
let agent = options.agent;
this.agent = agent;
this.agent.addRequest(this, options);

我们看到当使用agent的时候,ClientRequest会把请求托管给agent,当agent有可用socket的时候,就会执行ClientRequest实例的onSocket方法。ClientRequest会使用拿到的socket发送数据并解析收到的响应,那么收到响应后ClientRequest又是怎么处理的呢?ClientRequest中有一句关键代码

代码语言:javascript
复制
// 等待响应结束
  res.on('end', responseOnEnd)

responseOnEnd通过层层调用后,执行

req.socket.emit('free');

在_http_agent.js的installListener中监听了free事件

代码语言:javascript
复制
 // socket触发空闲事件的处理函数,告诉agent该socket空闲了,agent会回收该socket到空闲队列
  function onFree() {
    debug('CLIENT socket onFree');
    agent.emit('free', s, options);
  }
  // 监听socket空闲事件
  s.on('free', onFree);

socket进一步触发了agent的free,从而agent处理空闲的socket,销毁或者复用。

9 测试例子

客户端

代码语言:javascript
复制
1.  const http = require('http');  
2.  const keepAliveAgent = new http.Agent({ keepAlive: true, maxSockets: 1 });  
3.  const options = {port: 10000, method: 'GET',  host: '127.0.0.1',}  
4.  options.agent = keepAliveAgent;  
5.  http.get(options, () => {});  
6.  http.get(options, () => {});  
7.  // 等待的请求个数
8.  console.log(Object.keys(options.agent.requests).length)  

服务器

代码语言:javascript
复制
1.  let i =0;  
2.  const net = require('net');  
3.  net.createServer((socket) => {  
4.    console.log(++i);  
5.  }).listen(10000);  

在例子中,首先创建了一个tcp服务器。然后在客户端使用agent。但是maxSocket的值为1,代表最多只能有一个socket,而这时候客户端发送两个请求,所以有一个请求就会在排队。服务器也只收到了一个连接。服务器只输出1。当我们把maxSockets改成2则会看到输出1,2。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档