在上一篇文章里我们主要介绍了 tomcat NIO 中响应数据的写入,在这里我们主要介绍 BlockPoller 线程。
tomcat NIO 架构中会有一个 BlockPoller 线程,该线程主要处理阻塞的读写操作。对于请求体数据读取,根据以前文章一般由 tomcat io 线程进行,当数据不可读的时候的时候(客户端数据未发送完毕),会注册封装的 OPEN_READ 事件到 BlockPoller 线程中,然后阻塞当前线程(一般为tomcat io线程)。对于响应数据的写入,根据以前文章一般也由 tomcat io 线程进行,当数据不可写的时候(原始 socket 发送缓冲区满),会注册封装的 OPEN_WRITE 事件对象到 BlockPoller 线程中,然后阻塞当前线程。
BlockPoller 实例都有一个 NIO selector 对象,主要用于监测注册在原始 scoket 上的事件是否发生。该实例有事件队列 SynchronizedQueue<PollerEvent>,用来存放发生的事件,一般该事件队列的元素由前一篇文章介绍的 tomcat io 线程放入(当请求体不可读或者响应数据不可写的时候)。对于该 block poller thread ,主要包含如下 :
启动BlockPoller线程
该线程的启动主要在以前文章中介绍的架构类 NioBlockingSelector 的 open() 方法中完成,相关核心源代码如下:
public void open(String name, Selector selector) {
sharedSelector = selector;
poller = new BlockPoller();
poller.selector = sharedSelector;
poller.setDaemon(true);
poller.setName(name + "-BlockPoller");
poller.start();
}
添加事件到事件队列
该工作主要由 BlockPoller 类的 add() 方法完成,核心源码如下:
protected Selector selector = null;
protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
if (key == null) {
return;
}
NioChannel nch = key.getSocket();
final SocketChannel ch = nch.getIOChannel();
if (ch == null) {
return;
}
Runnable r = new RunnableAdd(ch, key, ops, ref);
events.offer(r);
wakeup();
}
public void wakeup() {
if (wakeupCounter.addAndGet(1)==0) {
selector.wakeup();
}
}
对原始socket注册事件
该工作主要由 BlockPoller 类的 events() 方法和队列元素 RunnableAdd 类的 run() 方法完成,核心源码如下:
//BlockPoller
public boolean events() {
Runnable r = null;
int size = events.size();
for (int i = 0; i < size && (r = events.poll()) != null; i++) {
r.run();
}
return (size > 0);
}
//RunnableAdd
public void run() {
SelectionKey sk = ch.keyFor(selector);
try {
if (sk == null) {
sk = ch.register(selector, ops, key);
ref.key = sk;
} else if (!sk.isValid()) {
cancel(sk, key, ops);
} else {
sk.interestOps(sk.interestOps() | ops);
}
} catch (CancelledKeyException cx) {
cancel(sk, key, ops);
} catch (ClosedChannelException cx) {
cancel(null, key, ops);
}
}
BlockPoller线程核心逻辑
该工作主要由 BlockPoller 对象实例的 run() 方法和 countDown() 方法完成,核心源码如下:
public void run() {
while (run) {
try {
events();
int keyCount = 0;
try {
int i = wakeupCounter.get();
if (i > 0) {
keyCount = selector.selectNow();
} else {
wakeupCounter.set(-1);
keyCount = selector.select(1000);
}
wakeupCounter.set(0);
if (!run) {
break;
}
} catch (NullPointerException x) {
// sun bug 5076772 on windows JDK 1.5
if (selector == null) {
throw x;
}
if (log.isDebugEnabled()) {
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
}
continue;
} catch (CancelledKeyException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled()) {
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
}
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("nioBlockingSelector.selectError"), x);
continue;
}
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (run && iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
try {
iterator.remove();
sk.interestOps(sk.interestOps() & (~sk.readyOps()));
if (sk.isReadable()) {
countDown(socketWrapper.getReadLatch());
}
if (sk.isWritable()) {
countDown(socketWrapper.getWriteLatch());
}
} catch (CancelledKeyException ckx) {
sk.cancel();
countDown(socketWrapper.getReadLatch());
countDown(socketWrapper.getWriteLatch());
}
}
} catch (Throwable t) {
log.error(sm.getString("nioBlockingSelector.processingError"), t);
}
}
events.clear();
if (selector.isOpen()) {
try {
selector.selectNow();
} catch (Exception ignore) {
if (log.isDebugEnabled())
log.debug("", ignore);
}
}
try {
selector.close();
} catch (Exception ignore) {
if (log.isDebugEnabled())
log.debug("", ignore);
}
}
public void countDown(CountDownLatch latch) {
if (latch == null) {
return;
}
latch.countDown();
}
目前先写到这里,下一篇文章里我们继续介绍 tomcat NIO 中的 block poller 线程的阻塞与唤醒。