前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >websocket库ws原理分析

websocket库ws原理分析

作者头像
theanarkh
发布2021-05-08 16:04:18
1.4K0
发布2021-05-08 16:04:18
举报
文章被收录于专栏:原创分享

前言:本文几基于nodejs的ws模块分析websocket的原理。

ws服务器逻辑由websocket-server.js的WebSocketServer类实现。该类初始化了一些参数后就执行以下代码

代码语言:javascript
复制
if (this._server) {
      // 给server注册下面事件,返回一个注销函数(用于注销下面注册的事件)
      this._removeListeners = addListeners(this._server, {
        // listen成功的回调
        listening: this.emit.bind(this, 'listening'),
        error: this.emit.bind(this, 'error'),
        // 收到协议升级请求的回调
        upgrade: (req, socket, head) => {
          this.handleUpgrade(req, socket, head, (ws) => {
            // 处理成功,触发链接成功事件
            this.emit('connection', ws, req);
          });
        }
      });

我们看到ws监听了upgrade事件,当有websocket请求到来时就会执行handleUpgrade处理升级请求,升级成功后触发connection事件。我们先看handleUpgrade。handleUpgrade逻辑不多,主要是处理和校验升级请求的一些http头。ws提供了一个校验的钩子。处理完http头后,会调verifyClient校验是否允许升级请求。如果成功则执行completeUpgrade。顾名思义,completeUpgrade是完成升级请求的函数,该函数返回同意协议升级并且设置一些http响应头。另外还有一些重要的逻辑处理。

代码语言:javascript
复制
const ws = new WebSocket(null);
// 设置管理socket的数据
ws.setSocket(socket, head, this.options.maxPayload);
// cb就是this.emit('connection', ws, req);
cb(ws);

我们看到这里新建了一个WebSocket对象并且调用了他的setSocket函数。我们来看看他做了什么。setSocket的逻辑非常多,我们慢慢分析。

数据接收者

代码语言:javascript
复制
class Receiver extends Writable {}

我们看到数据接收者是一个可写流。这就意味着我们可以往里面写数据。

代码语言:javascript
复制
const receiver = new Receiver();
receiver.write('hello');

我们看一下这时候Receiver的逻辑。

代码语言:javascript
复制
_write(chunk, encoding, cb) {
    if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
    this._bufferedBytes += chunk.length;
    this._buffers.push(chunk);
    this.startLoop(cb);
  }

首先记录当前数据的大小,然后把数据存起来,最后执行startLoop。

代码语言:javascript
复制
startLoop(cb) {
    let err;
    this._loop = true;

    do {
      switch (this._state) {
        // 忽略其他case
        case GET_DATA:
          err = this.getData(cb);
          break;
        default:
          // `INFLATING`
          this._loop = false;
          return;
      }
    } while (this._loop);

    cb(err);
  }

我们知道websocket是基于tcp上层的应用层协议,所以我们收到数据时,需要解析出一个个数据包(粘包问题),所以Receiver其实就是一个状态机,每次收到数据的时候,都会根据当前的状态进行状态流转。比如当前处于GET_DATA状态,那么就会进行数据的处理。我们接着看一下数据处理的逻辑。

代码语言:javascript
复制
getData(cb) {
    let data = EMPTY_BUFFER;
    // 提取数据部分
    if (this._payloadLength) {
      data = this.consume(this._payloadLength);
      if (this._masked) unmask(data, this._mask);
    }
    // 是控制报文则执行controlMessage
    if (this._opcode > 0x07) return this.controlMessage(data);
    // 做了压缩,则先解压
    if (this._compressed) {
      this._state = INFLATING;
      this.decompress(data, cb);
      return;
    }
    // 没有压缩则直接处理(先存到_fragments,然后执行dataMessage)
    if (data.length) {
      this._messageLength = this._totalPayloadLength;
      this._fragments.push(data);
    }

    return this.dataMessage();
  }

我们执行websocket协议定义了报文的类型,比如控制报文,数据报文。我们分别看一下这两个的逻辑。

代码语言:javascript
复制
controlMessage(data) {
    // 连接关闭
    if (this._opcode === 0x08) {
      this._loop = false;
      if (data.length === 0) {
        this.emit('conclude', 1005, '');
        this.end();
      }
    } else if (this._opcode === 0x09) {
      this.emit('ping', data);
    } else {
      this.emit('pong', data);
    }
    this._state = GET_INFO;
  }

我们看到控制报文包括三种(conclude、ping、pong)。而数据报文只有this.emit('message', data);一种。这个就是接收者的整体逻辑。

2 数据发送者

数据发送者是对websocket协议的封装,当用户调研数据发送者的send接口发送数据时,数据发送者会组装成一个websocket协议的包再发送出去。

代码语言:javascript
复制
send(data, options, cb) {
    const buf = toBuffer(data);
    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
    let opcode = options.binary ? 2 : 1;
    let rsv1 = options.compress;

    if (this._firstFragment) {
      this._firstFragment = false;
      if (rsv1 && perMessageDeflate) {
        rsv1 = buf.length >= perMessageDeflate._threshold;
      }
      this._compress = rsv1;
    } else {
      rsv1 = false;
      opcode = 0;
    }

    if (options.fin) this._firstFragment = true;
    // 需要压缩
    if (perMessageDeflate) {
      const opts = {
        fin: options.fin,
        rsv1,
        opcode,
        mask: options.mask,
        readOnly: toBuffer.readOnly
      };
      // 正在压缩,则排队等待,否则执行压缩
      if (this._deflating) {
        this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
      } else {
        this.dispatch(buf, this._compress, opts, cb);
      }
    } else {
      // 不需要压缩,直接发送
      this.sendFrame(
        Sender.frame(buf, {
          fin: options.fin,
          rsv1: false,
          opcode,
          mask: options.mask,
          readOnly: toBuffer.readOnly
        }),
        cb
      );
    }
  }

send函数做了一些参数的处理后发送数据,但是如果需要压缩的话,要压缩后才能发送。数据处理完成后调用真正的发送函数

代码语言:javascript
复制
sendFrame(list, cb) {
    if (list.length === 2) {
      this._socket.cork();
      this._socket.write(list[0]);
      this._socket.write(list[1], cb);
      this._socket.uncork();
    } else {
      this._socket.write(list[0], cb);
    }
  }

了解了数据接收者和发送者的逻辑后,我们看一下websocket对象和setSocket函数做了什么事情,websocket对象本质是对TCP socket的封装。它接收来自底层的数据,然后透传给数据接收者,数据接收者处理完后,触发websocket对应的对应的事件,比如message事件。发送数据的时候,websocket会调用数据发送者的接口,数据发送者组装成websocket协议的数据包后再发送出去,架构如下图所示。

接下来我们看看setSocket的逻辑

代码语言:javascript
复制
setSocket(socket, head, maxPayload) {
    // 数据接收者,负责处理tcp上收到的数据(socket是tcp层的socket)
    const receiver = new Receiver(...);
    // 数据发送者,负责发送数据给对端
    this._sender = new Sender(socket, this._extensions);
    // 数据接收者,负责解析数据
    this._receiver = receiver;
    // net模块的tcp socket
    this._socket = socket;
    // 关联起来
    receiver[kWebSocket] = this;
    socket[kWebSocket] = this;
    // 监听接收者的事件,解析数据的时候会回调
    receiver.on('conclude', receiverOnConclude);
    // 下面两个事件由Writable触发
    receiver.on('drain', receiverOnDrain);
    receiver.on('error', receiverOnError);
    receiver.on('message', receiverOnMessage);
    receiver.on('ping', receiverOnPing);
    receiver.on('pong', receiverOnPong);
    // 清除定时器
    socket.setTimeout(0);
    // 关闭nagle算法
    socket.setNoDelay();
    // 升级请求中,携带的http body,通常是空
    if (head.length > 0) socket.unshift(head);
    // 监听tcp底层的事件
    socket.on('close', socketOnClose);
    socket.on('data', socketOnData);
    socket.on('end', socketOnEnd);
    socket.on('error', socketOnError);

    this.readyState = WebSocket.OPEN;
    this.emit('open');
  }

我们看到里面监听了各种事件,下面以data事件为例,看一下处理过程。当tcp socket收到数据的时候会执行socketOnData函数。

代码语言:javascript
复制
function socketOnData(chunk) {
  // 会调用receiver里的_write函数,其实就是换成到receiver对象上,如果数据解析出错,会触发socket error事件
  if (!this[kWebSocket]._receiver.write(chunk)) {
    this.pause();
  }
}

socketOnData通过接收者的接口把数据传给接收者,接收者会解析数据,然后触发对应的事件,比如message。

代码语言:javascript
复制
receiver.on('message', receiverOnMessage);
function receiverOnMessage(data) {
  this[kWebSocket].emit('message', data);
}

然后ws的socket对象继续往上层触发message事件。this[kWebSocket]的值是ws提供的socket对象本身。架构图如下。

这就是ws实现websocket协议的基本原理,具体细节可以参考源码。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据接收者
  • 2 数据发送者
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档