在上一篇文章里我们主要介绍了 tomcat NIO 中的 acceptor 线程,其中包括了server 监听 socket 的初始化,端口绑定,acceptor 线程的启动,接受连接请求,将请求事件注册到 poller 线程。在这里我们主要介绍 poller 线程。
tomcat NIO 架构中会有 poller 线程,每一个 poller 实例都有一个 NIO selector对象,主要用于监测注册在原始 scoket 上的事件是否发生。每一个 poller 实例同时也是会拥有一个事件队列实例 SynchronizedQueue<PollerEvent>,用来存放发生的事件,一般该事件队列的 items 由前一篇文章介绍的 acceport 线程放入。对于 poller thread,主要包括如下 items:
启动poller线程:
poller 线程的启动主要在以前文章中介绍的架构类 NioEndpoint 的 startInternal 方法中完成,相关核心源代码如下:
public void startInternal() throws Exception {
//here we ignore other code section which not related with poller in this method to save space
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
添加事件到事件队列
该工作主要由 Poller 类的 register 方法和 addEvent 方法完成,核心源码如下:
private Selector selector
private AtomicLong wakeupCounter = new AtomicLong(0);
private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
对原始socket注册事件
该工作主要由 Poller 对象的 events() 方法以及 PollerEvent 对象的 run() 方法完成,核心源码如下:
//Poller
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused && eventCache != null) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x);
}
}
return result;
}
//PollerEvent
public void run() {
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
try {
if (key == null) {
try {
socket.socketWrapper.close();
} catch (Exception ignore) {
}
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
} catch (Exception ignore) {}
}
}
}
Poller线程核心逻辑
该工作主要由 Poller 类的 run() 方法和 processKey() 方法以及之前文章介绍的AbstractEndpoint 的 processSocket() 方法完成,核心源码如下:
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid() && socketWrapper != null) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
if (sk.isReadable()) {
if (socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, socketWrapper);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
//AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
目前先写到这里,下一篇文章里我们继续介绍 tomcat NIO 中 poller 线程的阻塞与唤醒。