我们看一下engineio的入口
exports = module.exports = function() {
if (arguments.length && arguments[0] instanceof http.Server) {
return attach.apply(this, arguments);
}
return new Server(arguments);
};
当第一个参数是http.Server,即传入了一个http服务器的时候,engineio就会直接执行attach函数。否则返回一个新的Server对象,原理是类似的。通常我们会传入一个http server。所以我们从attach函数开始
function attach(server, options) {
const engine = new Server(options);
engine.attach(server, options);
return engine;
}
attach新建了一个Server,然后执行他的attach方法。我们先分析new Server的逻辑
constructor(opts = {}) {
// 忽略一系列参数处理
this.init();
}
接着看init
init() {
const wsModule = this.opts.wsEngine === "ws" ? require("ws") : require(this.opts.wsEngine);
this.ws = new wsModule.Server({
// 不使用websocket的服务器能力
noServer: true,
clientTracking: false,
perMessageDeflate: this.opts.perMessageDeflate,
maxPayload: this.opts.maxHttpBufferSize
});
}
我们看到new Server的逻辑很简单,只是做了一些初始化。他依赖了ws模块的能力,ws模块是一个实现了websocket协议的库。初始化完毕后就会执行attach。
attach(server, options) {
const self = this;
// 判断是不是自己的请求
function check(req) {
return path === req.url.substr(0, path.length);
}
// 监听upgrade事件
server.on("upgrade", function(req, socket, head) {
// 判断是不是自己的请求
if (check(req)) {
self.handleUpgrade(req, socket, head);
}
}
engineio监听了upgrade事件,当有websocket连接到来时,就会执行回调handleUpgrade进行处理。
// 处理websocket协议升级请求
handleUpgrade(req, socket, upgradeHead) {
// 解析query参数
this.prepare(req);
const self = this;
this.verify(req, true, function(err, success) {
if (!success) {
abortConnection(socket, err);
return;
}
const head = Buffer.from(upgradeHead); // eslint-disable-line node/no-deprecated-api
upgradeHead = null;
/*
使用ws模块完成websocket的升级,ws回复同意升级协议后执行onWebSocket,
conn是一个ws模块的WebSocket对象,见websocke-server的completeUpgrade
*/
self.ws.handleUpgrade(req, socket, head, function(conn) {
self.onWebSocket(req, conn);
});
});
}
首先调用verify进行校验,校验通过后才能完成websocket升级请求。接着执行handleUpgrade。handleUpgrade是ws模块实现的,该函数主要是返回同意websocket升级的http响应报文,然后传入一个ws封装的对象conn,执行回调 self.onWebSocket(req, conn)。
onWebSocket(req, socket) {
req.websocket = socket;
this.handshake(req._query.transport, req);
}
接着看handshake
async handshake(transportName, req) {
let id = await this.generateId(req);
// 新建一个通道
var transport = new transports[transportName](req);
const socket = new Socket(id,
// 对应的server
this,
// 对应的通道,Socket只是封装底层的能力
transport,
// 对应的websocket升级请求对象
req);
const self = this;
transport.onRequest(req);
// 应用层拿到的socket对象
this.emit("connection", socket);
}
handshake主要是三个逻辑 1 新建一个通道,engineio封装了websocket,xhr,jsonp等消息通道,我们这里只关注websocket。2 新建一个socket,该socket的底层消息通道是1中创建的。socket是对底层的封装,提供给上层使用的。3 触发connection事件 我们先看一下websocket通道的创建。
constructor(req) {
super(req);
// 保存ws模块的websocket对象
this.socket = req.websocket;
/*
注册事件,由ws模块触发,然后再往上层触发,
message事件是当解析到websocket协议中的数据部分时触发
*/
this.socket.on("message", this.onData.bind(this));
this.socket.once("close", this.onClose.bind(this));
this.socket.on("error", this.onError.bind(this));
this.socket.on("headers", headers => {
this.emit("headers", headers);
});
this.writable = true;
this.perMessageDeflate = null;
}
我们看到,websocket通道也只是对底层通道的封装,ws模块封装了底层tcp数据处理的能力。所以engineio的websocket通道只需要关注上层的逻辑就行。创建完通道后,接着看创建socket。
constructor(id, server, transport, req) {
super();
this.id = id;
// socket关联的服务器
this.server = server;
// 状态
this.readyState = "opening";
// 待写数据
this.writeBuffer = [];
this.packetsFn = [];
this.sentCallbackFn = [];
// 关闭数据通道时执行的清除函数集
this.cleanupFn = [];
// 对应的request
this.request = req;
this.checkIntervalTimer = null;
// 检测是否完成协议切换的定时器id
this.upgradeTimeoutTimer = null;
// 心跳对应的定时器
this.pingTimeoutTimer = null;
// 发送ping包后,多久没有收到回复则断开连接对应的定时器id
this.pingIntervalTimer = null;
// 设置通信通道
this.setTransport(transport);
this.onOpen();
}
刚才讲过socket只是对底层通道的封装,所以初始化的时候需要设置底层的通道,这里是websocket通道。
setTransport(transport) {
const onError = this.onError.bind(this);
const onPacket = this.onPacket.bind(this);
const flush = this.flush.bind(this);
const onClose = this.onClose.bind(this, "transport close");
// 通道有数据则通知socket等,socket再往上报
this.transport = transport;
// 监听websocket.js的对象的事件,通道出错
this.transport.once("error", onError);
this.transport.on("packet", onPacket);
// 触发drain事件则继续发送数据
this.transport.on("drain", flush);
// 通道关闭
this.transport.once("close", onClose);
// ...
}
socket对象监听了底层通道的一些事件。这个设计和客户端是一样的,一层套一层。设置完通道后,继续执行onOpen。
// 发送由long polling升级到websocket的通知包 或者是直接建立websocket通信时发生的包
onOpen() {
this.readyState = "open";
this.transport.sid = this.id;
this.sendPacket(
"open",
JSON.stringify({
sid: this.id,
// 可以升级到这个(些)协议
upgrades: this.getAvailableUpgrades(),
// 心跳间隔
pingInterval: this.server.opts.pingInterval,
// 多久后发送心跳
pingTimeout: this.server.opts.pingTimeout
})
);
// 触发open事件
this.emit("open");
// 开启心跳检测
this.schedulePing();
}
在分析客户端的时候说过,建立websocket连接成功后,服务器会push一个open,就是上面的代码实现的,最后开始心跳的机制,每隔一段时间就给客户端发送一个ping包,然后客户端会回复一个pong包。至此就完成了整个处理过程。接下来就可以进行数据的通信。当底层的websocket连接收到数据后,会触发message事件。我们看看这个处理过程。
this.socket.on("message", this.onData.bind(this));
// 有数据到来时执行,根据engine.io协议进行解析,解析触发onpacket事件
onData(data) {
this.onPacket(parser.decodePacket(data));
}
// 对数据解析后执行
onPacket(packet) {
this.emit("packet", packet);
}
如图所示
通过前面的分析可以知道,socket对象监听了packet事件
onPacket(packet) {
if ("open" === this.readyState) {
this.emit("packet", packet);
// 收到数据包后重置定时器,避免关闭连接
this.resetPingTimeout(
this.server.opts.pingInterval + this.server.opts.pingTimeout
);
// type见engine.io-parser的decode
switch (packet.type) {
// 收到pong之后,等待一段时间后,继续发送ping包
case "pong":
this.schedulePing();
this.emit("heartbeat");
break;
// 解析出错,关闭socket
case "error":
this.onClose("parse error");
break;
case "message":
// data是上层数据,比如socketio协议的数据包
this.emit("data", packet.data);
this.emit("message", packet.data);
break;
}
}
}
我们看到收到数据包的时候,engineio会根据不同的包类型做不同处理。