Jetty的请求入口
ServerConnector.java
的 accepted
方法(ServerSocketChannel
#accept
后的处理逻辑)。
一个请求的流程:
Acceptor
监听连接请求,当有连接请求到达时就接受连接,一个连接对应一个 Channel,Acceptor 将 Channel 交给 ManagedSelector 来处理。
ManagedSelector
把 Channel 注册到 Selector 上,并创建一个 EndPoint
和 Connection
跟这个 Channel 绑定,接着就不断地检测 I/O 事件。
EndPoint
的方法拿到一个 Runnable,并扔给线程池执行。
Connection
注册到 EndPoint
中的。
EndPoint
的接口方法来读数据。
Connection
解析读到的数据,生成请求对象并交给 Handler
组件去处理。
Jetty 的 Selector 由 SelectorManager
类管理,而被管理的 Selector 叫作 ManagedSelector
。 (这里SelectorManager
的具体实现类是 ServerConnector
的内部类 ServerConnectorManager
。) SelectorManager 内部有一个 ManagedSelector 数组,真正干活的是 ManagedSelector。 主要做两个事情:
ManagedSelector
来处理 ChannelManagedSelector
// SelectorManager.java
public void accept(SelectableChannel channel, Object attachment)
{
// 选择一个 ManagedSelector 来处理 Channel
final ManagedSelector selector = chooseSelector();
// 提交一个任务 Accept 给 ManagedSelector
selector.submit(selector.new Accept(channel, attachment));
}
ManagedSelector
在处理这个任务Accept
主要做了两步:
register
方法把 Channel 注册到 Selector 上,拿到一个 SelectionKey。// ManagedSelector$Accept.java 内部类
@Override
public void update(Selector selector){
try{
// 把 Channel 注册到 Selector 上,拿到一个 SelectionKey
key = channel.register(selector, 0, attachment);
execute(this); // 执行当前Runnable, 跳转 run()
} catch (Throwable x) {
IO.close(channel);
_selectorManager.onAcceptFailed(channel, x);
LOG.debug(x);
}
}
@Override
public void run(){
try{
// 创建一个 EndPoint 和 Connection,并跟这个 SelectionKey(Channel)绑在一起
createEndPoint(channel, key);
_selectorManager.onAccepted(channel);
} catch (Throwable x){
LOG.debug(x);
failed(x);
}
}
EndPoint
和 Connection
,并跟这个 SelectionKey(Channel)绑在一起:// ManagedSelector.java
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException{
//1. 创建 Endpoint
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
//2. 创建 Connection
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
//3. 把 Endpoint、Connection 和 SelectionKey 绑在一起
endPoint.setConnection(connection);
selectionKey.attach(endPoint);
endPoint.onOpen();
endPointOpened(endPoint);
// 将Connection 注入到 EndPoint, 跳转 Connection 的 onOpen 方法 (内部转 fillInterested)
_selectorManager.connectionOpened(connection);
}
如上,HttpConnection
(Connection 的具体实现类之一) 并不会主动向 EndPoint
读取数据,而是向在 EndPoint
中注册一堆回调方法:
// HttpConnection.java
public void onOpen(){
super.onOpen();
if (isRequestBufferEmpty())
fillInterested();
else
getExecutor().execute(this);
}
public void fillInterested(){
// 告诉 EndPoint,数据到了你就调我这些回调方法 _readCallback 吧,有点异步 I/O 的感觉,也就是说 Jetty 在应用层面模拟了异步 I/O 模型。
getEndPoint().fillInterested(_readCallback);
}
这时候,ManagedSelector
启动时候的任务 EatWhatYouKill
的 无限循环,监测到SelectorProducer
的 processSelected
方法选择 出一个 Runnable 则,会自动执行。 这个 Runnable 从上步骤来看,则是 调用EndPoint
的 onSelected 方法返回一个 Runnable,然后把这个 Runnable 直接执行或扔给线程池执行。
// ManagedSelector.java
_selectorManager.execute(_strategy::produce);
// ManagedSelector$SelectorProducer.java
public Runnable produce() {
while (true) {
Runnable task = processSelected();
if (task != null) {
return task;
}
processUpdates();
updateKeys();
// 唤醒select
if (!select()) {
return null;
}
}
}
具体里面就是 调用 EndPoint
的 onSelected
方法
// ManagedSelector$SelectorProducer.java
...
if (attachment instanceof Selectable){
// Try to produce a task
Runnable task = ((Selectable)attachment).onSelected();
if (task != null)
return task;
}
(这里EndPoint
的具体实现类是 ChannelEndPoint
)
这里怎么选择出这个 Runnable 呢??
// ChannelEndPoint.java
public Runnable onSelected() {
....
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
return task;
}
很简单,依据 selector 监听到连接上是读就绪(channel通道中有数据可读),还算写就绪(channel通道中有数据可写)。
AbstractConnection$ReadCallback
,即是 HttpConnection
的 onFillable() 方法
调用 EndPoint 的接口去读数据,读完后让 HTTP 解析器HttpParser
去解析字节流,HTTP 解析器会将解析后的数据,包括请求行、请求头相关信息存到 Request 对象里, 然后丢给我们的 第一个Handler。
//HttpConnection.java
public void onFillable(){
// HTTP 解析器去解析字节流
boolean handle = parseRequestBuffer();
...
// 将解析后的数据,包括请求行、请求头相关信息存到 Request 对象, 然后丢给我们的 第一个Handler
if (handle){
boolean suspended = !_channel.handle();
...
}
//HttpChannel.java
public boolean handle(){
...
dispatch(DispatcherType.REQUEST, () -> {
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) {
customizer.customize(getConnector(), _configuration, _request);
if (_request.isHandled())
return;
}
getServer().handle(HttpChannel.this);
});
....
}
比如HttpChannelOverHttp
(HttpChannel的具体实现类之一)
AbstractConnection$ReadCallback
,即是 HttpConnection
的 onFillable() 方法
Connection 调用 Handler 进行业务处理,Handler 会通过 Response 对象来操作响应流,向流里面写入数据,HttpConnection 再通过 EndPoint 把数据写到 Channel,这样一次响应就完成了。
by 斯武丶风晴 https://my.oschina.net/langxSpirit
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有