http1.0的时候,不支持pipeline,客户端发送一个请求的时候,首先建立tcp连接,然后服务器返回一个响应,最后断开tcp连接,这种是最简单的实现方式,但是每次发送请求都需要走三次握手显然会带来一定的时间损耗,所以http1.1的时候,支持了pipeline。pipeline的意思就是可以在一个tcp连接上发送多个请求,这样服务器就可以同时处理多个请求,但是由于http1.1的限制,多个请求的响应需要按序返回。因为在http1.1中,没有标记请求和响应的对应关系。所以http客户端会假设第一个返回的响应是对应第一个请求的。如果乱序返回,就会导致问题。
在http2.0中,每个请求会分配一个id,响应中也会返回对应的id,这样就算乱序返回,http客户端也可以知道响应所对应的请求。在http1.1这种情况下,http服务器的实现就会变得复杂,服务器可以以串行的方式处理请求,当前面请求的响应返回到客户端后,再继续处理下一个请求,这种实现方式是相对简单的,但是很明显,这种方式相对来说还是比较低效的,另一种实现方式是并行处理请求,串行返回,这样可以让请求得到尽快的处理,比如两个请求都访问数据库,那并行处理两个请求就会比串行快得多,但是这种实现方式相对比较复杂,nodejs就是属于这种方式,下面我们来看一下nodejs中是如何实现的。首先我们看一下如何创建一个http服务器。
function createServer(opts, requestListener) {
return new Server(opts, requestListener);
}
function Server(options, requestListener) {
// 可以自定义表示请求的对象和响应的对象
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
// 允许半关闭
net.Server.call(this, { allowHalfOpen: true });
// 有请求时的回调
if (requestListener) {
this.on('request', requestListener);
}
// 服务器socket读端关闭时是否允许继续处理队列里的响应(tcp上有多个请求,pipeline)
this.httpAllowHalfOpen = false;
// 有连接时的回调,由net模块触发
this.on('connection', connectionListener);
// 同一个tcp连接上,两个请求之前最多间隔的时间
this.keepAliveTimeout = 5000;
// 解析头部的超时时间,防止ddos
this.headersTimeout = 60 * 1000; // 60 seconds
}
nodejs监听了两个事件connection和request。分别表示在由新连接和新的http请求。我们主要看一下connect,因为发送http请求首先需要建立一个tcp连接。
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
function connectionListenerInternal(server, socket) {
socket.server = server;
// 分配一个http解析器
const parser = parsers.alloc();
// 解析请求报文
parser.initialize(
HTTPParser.REQUEST,
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket),
server.maxHeaderSize || 0,
server.insecureHTTPParser === undefined ?
isLenient() : server.insecureHTTPParser,
);
parser.socket = socket;
// 开始解析头部的开始时间
parser.parsingHeadersStart = nowDate();
socket.parser = parser;
const state = {
onData: null,
onEnd: null,
onClose: null,
onDrain: null,
// 同一tcp连接上,请求和响应的的队列
outgoing: [],
incoming: [],
outgoingData: 0,
keepAliveTimeoutSet: false
};
state.onData = socketOnData.bind(undefined, server, socket, parser, state);
state.onEnd = socketOnEnd.bind(undefined, server, socket, parser, state);
// tcp连接上有数据到来时的回调
socket.on('data', state.onData);
// tcp读端结束时的回调
socket.on('end', state.onEnd);
// 解析完http请求头时的回调
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state);
}
nodejs注册了事件等待tcp上数据的到来。我们看一下有数据到来时nodejs的处理。
function socketOnData(server, socket, parser, state, d) {
// 交给http解析器处理
const ret = parser.execute(d);
onParserExecuteCommon(server, socket, parser, state, ret, d);
}
我们看一下http解析器的一些逻辑。
const parsers = new FreeList('parsers', 1000, function parsersCb() {
const parser = new HTTPParser();
cleanParser(parser);
// 解析完头部的回调
parser.onIncoming = null;
// 解析http头时的回调,在http头个数达到阈值时回调,可能会回调多次
parser[kOnHeaders] = parserOnHeaders;
// 解析完http头时的回调,会执行onIncoming
parser[kOnHeadersComplete] = parserOnHeadersComplete;
// 解析body时的回调
parser[kOnBody] = parserOnBody;
// 解析完http报文时的回调
parser[kOnMessageComplete] = parserOnMessageComplete;
return parser;
});
从上面的代码中我们可以知道,nodejs在tcp连接上接收到数据后,会交给http解析器处理,http是一个非常复杂的状态机,在解析数据的时候会回调nodejs设置的各种钩子。这里我们只需要关注kOnHeadersComplete钩子。
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
url, statusCode, statusMessage, upgrade,
shouldKeepAlive) {
// 新建一个表示请求的对象,一般是IncomingMessage
const ParserIncomingMessage = (socket && socket.server &&
socket.server[kIncomingMessage]) ||
IncomingMessage;
// 新建一个IncomingMessage对象
const incoming = parser.incoming = new ParserIncomingMessage(socket);
incoming.httpVersionMajor = versionMajor;
incoming.httpVersionMinor = versionMinor;
incoming.httpVersion = `${versionMajor}.${versionMinor}`;
incoming.url = url;
incoming.upgrade = upgrade;
// ...
// 执行回调
return parser.onIncoming(incoming, shouldKeepAlive);
}
我们刚才看到nodejs注册的onIncoming回调是parserOnIncoming。
function parserOnIncoming(server, socket, state, req, keepAlive) {
// 标记头部解析完毕
socket.parser.parsingHeadersStart = 0;
// 请求入队
state.incoming.push(req);
// 新建一个表示响应的对象,一般是ServerResponse
const res = new server[kServerResponse](req);
// socket当前已经在处理其他请求的响应,则先排队,否则挂载响应对象到socket,作为当前处理的响应
if (socket._httpMessage) {
state.outgoing.push(res);
} else {
res.assignSocket(socket); // socket._httpMessage = res;
}
// 响应处理完毕后,需要做一些处理
res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));
// 触发request事件说明有请求到来
server.emit('request', req, res);
return 0;
}
当nodejs解析http请求头完成后,就会创建一个ServerResponse对象表示响应。然后判断当前是否有正在处理的响应,如果有则排队等待处理,否则把新建的ServerResponse对象作为当前需要处理的响应。最后触发request事件通知用户层。用户就可以进行请求的处理了。我们看到nodejs维护了两个队列,分别是请求和响应队列。
当前处理的请求在请求队列的队首,该请求对应的响应会挂载到socket的_httpMessage属性上。但是我们看到nodejs会触发request事件通知用户有新请求到来,所有在pipeline的情况下,nodejs会并行处理多个请求(如果是cpu密集型的请求则实际上还是会变成串行,这和nodejs的单线程相关)。那nodejs是如何控制响应的顺序的呢?我们知道每次触发request事件的时候,我们都会执行一个函数。比如下面的代码。
http.createServer((req, res) => {
// 一些网络io
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('okay');
});
我们看到每个请求的处理是独立的。假设每个请求都去操作数据库,如果请求2比请求1先完成数据库的操作,从而请求2先执行res.write和res.end。那岂不是请求2先返回?我们看一下ServerResponse和OutgoingMessage的实现,揭开迷雾。ServerResponse是OutgoingMessage的子类。write函数是在OutgoingMessage中实现的,write的调用链路很长,我们不层层分析,直接看最后的节点。
function _writeRaw(data, encoding, callback) {
const conn = this.socket;
// socket对应的响应是自己并且可写
if (conn && conn._httpMessage === this && conn.writable) {
// 如果有缓存的数据则先发送缓存的数据
if (this.outputData.length) {
this._flushOutput(conn);
}
// 接着发送当前需要发送的
return conn.write(data, encoding, callback);
}
// socket当前处理的响应对象不是自己,则先缓存数据。
this.outputData.push({ data, encoding, callback });
this.outputSize += data.length;
this._onPendingData(data.length);
return this.outputSize < HIGH_WATER_MARK;
}
我们看到我们调用res.write的时候,nodejs会首先判断,res是不是属于当前处理中响应,如果是才会真正发送数据,否则会先把数据缓存起来。分析到这里,相信大家已经差不多明白nodejs是如何控制响应按序返回的。最后我们看一下这些缓存的数据什么时候会被发送出去。前面代码已经贴过,当一个响应结束的时候,nodejs会做一些处理。
res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));
我们看看resOnFinish
function resOnFinish(req, res, socket, state, server) {
// 删除响应对应的请求
state.incoming.shift();
clearIncoming(req);
// 解除socket上挂载的响应对象
res.detachSocket(socket);
req.emit('close');
process.nextTick(emitCloseNT, res);
// 是不是最后一个响应
if (res._last) {
// 是则销毁socket
if (typeof socket.destroySoon === 'function') {
socket.destroySoon();
} else {
socket.end();
}
} else if (state.outgoing.length === 0) {
// 没有待处理的响应了,则重新设置超时时间,等待请求的到来,一定时间内没有请求则触发timeout事件
if (server.keepAliveTimeout && typeof socket.setTimeout === 'function') {
socket.setTimeout(server.keepAliveTimeout);
state.keepAliveTimeoutSet = true;
}
} else {
// 获取下一个要处理的响应
const m = state.outgoing.shift();
// 挂载到socket作为当前处理的响应
if (m) {
m.assignSocket(socket);
}
}
}
我们看到,nodejs处理完一个响应后,会做一些判断。分别有三种情况,我们分开分析。
1 是否是最后一个响应 什么情况下,会被认为是最后一个响应的?因为响应和请求是一一对应的,最后一个响应就意味着最后一个请求了,那么什么时候被认为是最后一个请求呢?当非pipeline的情况下,一个请求一个响应,然后关闭tcp连接,所以非pipeline的情况下,tcp上的第一个也是唯一一个请求就是最后一个请求。在pipeline的情况下,理论上就没有所谓的最后一个响应。但是实现上会做一些限制。在pipeline的情况下,每一个响应可以通过设置http响应头connection来定义是否发送该响应后就断开连接,我们看一下nodejs的实现。
// 是否显示删除过connection头,是则响应后断开连接,并标记当前响应是最后一个
if (this._removedConnection) {
this._last = true;
this.shouldKeepAlive = false;
} else if (!state.connection) {
/*
没有显示设置了connection头,则取默认行为
1 shouldKeepAlive默认为true
2 设置content-length或使用chunk模式才能区分响应报文编边界,才能支持keepalive
3 使用了代理,代理是复用tcp连接的,支持keepalive
*/
const shouldSendKeepAlive = this.shouldKeepAlive &&
(state.contLen || this.useChunkedEncodingByDefault || this.agent);
if (shouldSendKeepAlive) {
header += 'Connection: keep-alive\r\n';
} else {
this._last = true;
header += 'Connection: close\r\n';
}
}
另外当读端关闭的时候,也被认为是最后一个请求,毕竟不会再发送请求了。我们看一下读端关闭的逻辑。
function socketOnEnd(server, socket, parser, state) {
const ret = parser.finish();
if (ret instanceof Error) {
socketOnError.call(socket, ret);
return;
}
// 不允许半开关则终止请求的处理,不响应,关闭写端
if (!server.httpAllowHalfOpen) {
abortIncoming(state.incoming);
if (socket.writable) socket.end();
} else if (state.outgoing.length) {
// 允许半开关,并且还有响应需要处理,标记响应队列最后一个节点为最后的响应,处理完就关闭socket写端
state.outgoing[state.outgoing.length - 1]._last = true;
} else if (socket._httpMessage) {
// 没有等待处理的响应了,但是还有正在处理的响应,则标记为最后一个响应
socket._httpMessage._last = true;
} else if (socket.writable) {
// 否则关闭socket写端
socket.end();
}
}
以上就是nodejs中判断是否是最后一个响应的情况,如果一个响应被认为是最后一个响应,那么发送响应后就会关闭连接。
2 响应队列为空 我们继续看一下如果不是最后一个响应的时候,nodejs又是怎么处理的。如果当前的待处理响应队列为空,说明当前处理的响应是目前最后一个需要处理的,但是不是tcp连接上最后一个响应,这时候,nodejs会设置超时时间,如果超时还没有新的请求,则nodejs会关闭连接。
3 响应队列非空 如果当前待处理队列非空,处理完当前请求后会继续处理下一个响应。并从队列中删除该响应。我们看一下nodejs是如何处理下一个响应的。
// 把响应对象挂载到socket,标记socket当前正在处理的响应
ServerResponse.prototype.assignSocket = function assignSocket(socket) {
// 挂载到socket上,标记是当前处理的响应
socket._httpMessage = this;
socket.on('close', onServerResponseClose);
this.socket = socket;
this.emit('socket', socket);
this._flush();
};
我们看到nodejs是通过_httpMessage标记当前处理的响应的,配合响应队列来实现响应的按序返回。标记完后执行_flush发送响应的数据(如果这时候请求已经被处理完成)
OutgoingMessage.prototype._flush = function _flush() {
const socket = this.socket;
if (socket && socket.writable) {
const ret = this._flushOutput(socket);
};
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
// 之前设置了加塞,则操作socket先积攒数据
while (this[kCorked]) {
this[kCorked]--;
socket.cork();
}
const outputLength = this.outputData.length;
// 没有数据需要发送
if (outputLength <= 0)
return undefined;
const outputData = this.outputData;
// 加塞,让数据一起发送出去
socket.cork();
// 把缓存的数据写到socket
let ret;
for (let i = 0; i < outputLength; i++) {
const { data, encoding, callback } = outputData[i];
ret = socket.write(data, encoding, callback);
}
socket.uncork();
this.outputData = [];
this._onPendingData(-this.outputSize);
this.outputSize = 0;
return ret;
}
以上就是nodejs中对于pipeline的实现。