前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >engine.io原理分析

engine.io原理分析

作者头像
theanarkh
发布2021-05-08 16:03:55
5150
发布2021-05-08 16:03:55
举报
文章被收录于专栏:原创分享原创分享

我们看一下engineio的入口

代码语言:javascript
复制
exports = module.exports = function() {
  if (arguments.length && arguments[0] instanceof http.Server) {
    return attach.apply(this, arguments);
  }
  return new Server(arguments);
};

当第一个参数是http.Server,即传入了一个http服务器的时候,engineio就会直接执行attach函数。否则返回一个新的Server对象,原理是类似的。通常我们会传入一个http server。所以我们从attach函数开始

代码语言:javascript
复制
function attach(server, options) {
  const engine = new Server(options);
  engine.attach(server, options);
  return engine;
}

attach新建了一个Server,然后执行他的attach方法。我们先分析new Server的逻辑

代码语言:javascript
复制
constructor(opts = {}) {
    // 忽略一系列参数处理
    this.init();
  }

接着看init

代码语言:javascript
复制
init() {
    const wsModule = this.opts.wsEngine === "ws" ? require("ws") : require(this.opts.wsEngine);
    this.ws = new wsModule.Server({
      // 不使用websocket的服务器能力
      noServer: true,
      clientTracking: false,
      perMessageDeflate: this.opts.perMessageDeflate,
      maxPayload: this.opts.maxHttpBufferSize
    });
  }

我们看到new Server的逻辑很简单,只是做了一些初始化。他依赖了ws模块的能力,ws模块是一个实现了websocket协议的库。初始化完毕后就会执行attach。

代码语言:javascript
复制
attach(server, options) {
    const self = this;
    // 判断是不是自己的请求
    function check(req) {
      return path === req.url.substr(0, path.length);
    }
    // 监听upgrade事件
    server.on("upgrade", function(req, socket, head) {
        // 判断是不是自己的请求
        if (check(req)) {
          self.handleUpgrade(req, socket, head);
        }
  }

engineio监听了upgrade事件,当有websocket连接到来时,就会执行回调handleUpgrade进行处理。

代码语言:javascript
复制
// 处理websocket协议升级请求
  handleUpgrade(req, socket, upgradeHead) {
    // 解析query参数
    this.prepare(req);

    const self = this;
    this.verify(req, true, function(err, success) {
      if (!success) {
        abortConnection(socket, err);
        return;
      }

      const head = Buffer.from(upgradeHead); // eslint-disable-line node/no-deprecated-api
      upgradeHead = null;

      /*
        使用ws模块完成websocket的升级,ws回复同意升级协议后执行onWebSocket,
        conn是一个ws模块的WebSocket对象,见websocke-server的completeUpgrade
      */
      self.ws.handleUpgrade(req, socket, head, function(conn) {
        self.onWebSocket(req, conn);
      });
    });
  }

首先调用verify进行校验,校验通过后才能完成websocket升级请求。接着执行handleUpgrade。handleUpgrade是ws模块实现的,该函数主要是返回同意websocket升级的http响应报文,然后传入一个ws封装的对象conn,执行回调 self.onWebSocket(req, conn)。

代码语言:javascript
复制
onWebSocket(req, socket) {
    req.websocket = socket;
    this.handshake(req._query.transport, req);
  }

接着看handshake

代码语言:javascript
复制
async handshake(transportName, req) {
    let id = await this.generateId(req);
    // 新建一个通道
    var transport = new transports[transportName](req);
    const socket = new Socket(id, 
                              // 对应的server
                              this, 
                              // 对应的通道,Socket只是封装底层的能力
                              transport, 
                              // 对应的websocket升级请求对象
                              req);
    const self = this;
    transport.onRequest(req);
    // 应用层拿到的socket对象
    this.emit("connection", socket);
  }

handshake主要是三个逻辑 1 新建一个通道,engineio封装了websocket,xhr,jsonp等消息通道,我们这里只关注websocket。2 新建一个socket,该socket的底层消息通道是1中创建的。socket是对底层的封装,提供给上层使用的。3 触发connection事件 我们先看一下websocket通道的创建。

1 创建通道

代码语言:javascript
复制
constructor(req) {
    super(req);
    // 保存ws模块的websocket对象
    this.socket = req.websocket;
    /*
      注册事件,由ws模块触发,然后再往上层触发,
      message事件是当解析到websocket协议中的数据部分时触发
    */
    this.socket.on("message", this.onData.bind(this));
    this.socket.once("close", this.onClose.bind(this));
    this.socket.on("error", this.onError.bind(this));
    this.socket.on("headers", headers => {
      this.emit("headers", headers);
    });
    this.writable = true;
    this.perMessageDeflate = null;
  }

我们看到,websocket通道也只是对底层通道的封装,ws模块封装了底层tcp数据处理的能力。所以engineio的websocket通道只需要关注上层的逻辑就行。创建完通道后,接着看创建socket。

2 创建socket

代码语言:javascript
复制
constructor(id, server, transport, req) {
    super();
    this.id = id;
    // socket关联的服务器
    this.server = server;
    // 状态
    this.readyState = "opening";
    // 待写数据
    this.writeBuffer = [];
    this.packetsFn = [];
    this.sentCallbackFn = [];
    // 关闭数据通道时执行的清除函数集
    this.cleanupFn = [];
    // 对应的request
    this.request = req;
    this.checkIntervalTimer = null;
    // 检测是否完成协议切换的定时器id
    this.upgradeTimeoutTimer = null;
    // 心跳对应的定时器
    this.pingTimeoutTimer = null;
    // 发送ping包后,多久没有收到回复则断开连接对应的定时器id
    this.pingIntervalTimer = null;
    // 设置通信通道
    this.setTransport(transport);
    this.onOpen();
  }

刚才讲过socket只是对底层通道的封装,所以初始化的时候需要设置底层的通道,这里是websocket通道。

代码语言:javascript
复制
setTransport(transport) {
    const onError = this.onError.bind(this);
    const onPacket = this.onPacket.bind(this);
    const flush = this.flush.bind(this);
    const onClose = this.onClose.bind(this, "transport close");
    // 通道有数据则通知socket等,socket再往上报
    this.transport = transport;
    // 监听websocket.js的对象的事件,通道出错
    this.transport.once("error", onError);
    this.transport.on("packet", onPacket);
    // 触发drain事件则继续发送数据
    this.transport.on("drain", flush);
    // 通道关闭
    this.transport.once("close", onClose);
    // ...
  }

socket对象监听了底层通道的一些事件。这个设计和客户端是一样的,一层套一层。设置完通道后,继续执行onOpen。

代码语言:javascript
复制
// 发送由long polling升级到websocket的通知包 或者是直接建立websocket通信时发生的包
  onOpen() {
    this.readyState = "open";
    this.transport.sid = this.id;
    this.sendPacket(
      "open",
      JSON.stringify({
        sid: this.id,
        // 可以升级到这个(些)协议
        upgrades: this.getAvailableUpgrades(),
        // 心跳间隔
        pingInterval: this.server.opts.pingInterval,
        // 多久后发送心跳
        pingTimeout: this.server.opts.pingTimeout
      })
    );

    // 触发open事件
    this.emit("open");
    // 开启心跳检测
    this.schedulePing();
  }

在分析客户端的时候说过,建立websocket连接成功后,服务器会push一个open,就是上面的代码实现的,最后开始心跳的机制,每隔一段时间就给客户端发送一个ping包,然后客户端会回复一个pong包。至此就完成了整个处理过程。接下来就可以进行数据的通信。当底层的websocket连接收到数据后,会触发message事件。我们看看这个处理过程。

代码语言:javascript
复制
this.socket.on("message", this.onData.bind(this));
  // 有数据到来时执行,根据engine.io协议进行解析,解析触发onpacket事件
  onData(data) {
    this.onPacket(parser.decodePacket(data));
  }
  // 对数据解析后执行
  onPacket(packet) {
    this.emit("packet", packet);
  }

如图所示

通过前面的分析可以知道,socket对象监听了packet事件

代码语言:javascript
复制
onPacket(packet) {
    if ("open" === this.readyState) {
      this.emit("packet", packet);
      // 收到数据包后重置定时器,避免关闭连接
      this.resetPingTimeout(
        this.server.opts.pingInterval + this.server.opts.pingTimeout
      );
      // type见engine.io-parser的decode
      switch (packet.type) {
        // 收到pong之后,等待一段时间后,继续发送ping包
        case "pong":
          this.schedulePing();
          this.emit("heartbeat");
          break;
        // 解析出错,关闭socket
        case "error":

          this.onClose("parse error");
          break;

        case "message":
          // data是上层数据,比如socketio协议的数据包
          this.emit("data", packet.data);
          this.emit("message", packet.data);
          break;
      }
    } 
  }

我们看到收到数据包的时候,engineio会根据不同的包类型做不同处理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 创建通道
  • 2 创建socket
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档