1、问题提出 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302 简单来说:epoll机制是Linux下一种高效的IO复用方式,相较于select和poll机制来说。其高效的原因是将基于事件的fd放到内核中来完成,在内核中基于红黑树+链表数据结构来实现,链表存放有事件发生的fd集合,然后在调用epoll_wait时返回给应用程序,由应用程序来处理这些fd事件。 使用IO复用,Linux下一般默认就是epoll,Java NIO在Linux下默认也是epoll机制,但是JDK中epoll的实现却是有漏洞的,其中最有名的java nio epoll bug就是即使是关注的select轮询事件返回数量为0,NIO照样不断的从select本应该阻塞的Selector.select()/Selector.select(timeout)中wake up出来,导致CPU 100%问题。 2、官方论坛的重现步骤 A DESCRIPTION OF THE PROBLEM : The NIO selector wakes up infinitely in this situation..
0. server waits for connection 1. client connects and write message 2. server accepts and register OP_READ 3. server reads message and remove OP_READ from interest op set 4. client close the connection 5. server write message (without any reading.. surely OP_READ is not set) 6. server's select wakes up infinitely with return value 0 3、重现代码
---------- BEGIN SOURCE ----------
TestServer.java
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
import java.util.logging.*;
public class TestServer {
private static final long SLEEP_PERIOD = 5000L; // 5 seconds
private static final int BUFFER_SIZE = 8192;
private int port;
public TestServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Throwable {
if (args.length < 1) {
System.err.println("Usage : java TestServer <port>");
System.exit(0);
}
new TestServer(Integer.parseInt(args[0])).start();
}
public void start() throws Throwable {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket server = serverChannel.socket();
server.bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
SocketChannel clientChannel = null;
System.out.println("0. SERVER STARTED TO LISTEN");
boolean writeNow = false;
while (true) {
try {
// wait for selection
int numKeys = selector.select();
if (numKeys == 0) {
System.err.println("select wakes up with zero!!!");
}
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selected = (SelectionKey) it.next();
int ops = selected.interestOps();
try {
// process new connection
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
// register channel to selector
clientChannel.register(selector, SelectionKey.OP_READ, null);
System.out.println("2. SERVER ACCEPTED AND REGISTER READ OP : client - " + clientChannel.socket().getInetAddress());
}
if ((ops & SelectionKey.OP_READ) != 0) {
// read client message
System.out.println("3. SERVER READ DATA FROM client - " + clientChannel.socket().getInetAddress());
readClient((SocketChannel) selected.channel(), buffer);
// deregister OP_READ
System.out.println("PREV SET : " + selected.interestOps());
selected.interestOps(selected.interestOps() & ~SelectionKey.OP_READ);
System.out.println("NEW SET : " + selected.interestOps());
Thread.sleep(SLEEP_PERIOD * 2);
new WriterThread(clientChannel).start();
}
} finally {
// remove from selected key set
it.remove();
}
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
}
}
}
public void readClient(SocketChannel channel, ByteBuffer buffer) throws IOException {
try {
buffer.clear();
int nRead = channel.read(buffer);
if (nRead < 0) {
channel.close();
return;
}
if (buffer.position() != 0) {
int size = buffer.position();
buffer.flip();
byte[] bytes = new byte[size];
buffer.get(bytes);
System.out.println("RECVED : " + new String(bytes));
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
channel.close();
}
}
static class WriterThread extends Thread {
private SocketChannel clientChannel;
public WriterThread(SocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
public void run() {
try {
writeClient(clientChannel);
System.out.println("5. SERVER WRITE DATA TO client - " + clientChannel.socket().getInetAddress());
} catch (IOException e) {
System.err.println("5. SERVER WRITE DATA FAILED : " + e);
}
}
public void writeClient(SocketChannel channel) throws IOException {
try {
ByteBuffer buffer = ByteBuffer.wrap("zwxydfdssdfsd".getBytes());
int total = buffer.limit();
int totalWrote = 0;
int nWrote = 0;
while ((nWrote = channel.write(buffer)) >= 0) {
totalWrote += nWrote;
if (totalWrote == total) {
break;
}
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
channel.close();
}
}
}
}
TestClient.java
import java.util.logging.*;
import java.io.*;
import java.net.*;
public class TestClient {
private static final long SLEEP_PERIOD = 5000L; // 5 seconds
private String host;
private int port;
public TestClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) throws Throwable {
if (args.length < 2 || args[0].equals("127.0.0.1") || args[0].equals("localhost")) {
System.err.println("Usage : java TestClient <host name> <port> (host name should not be localhost)");
System.exit(0);
}
new TestClient(args[0], Integer.parseInt(args[1])).start();
}
public void start() throws Throwable {
Socket socket = new Socket(host, port);
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream()),
true /* auto flush */);
out.println("abcdef");
System.out.println("1. CLIENT CONNECTED AND WROTE MESSAGE");
Thread.sleep(SLEEP_PERIOD);
// socket.shutdownOutput();
socket.close();
System.out.println("4. CLIENT SHUTDOWN OUTPUT");
Thread.sleep(SLEEP_PERIOD * 3);
}
}
---------- END SOURCE ----------
4、netty解决方案rebuildSelector源码
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
//统计数量
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD//默认512) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
private Selector selectRebuildSelector(int selectCnt) throws IOException {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
Selector selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
return selector;
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
//关闭旧的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
每天提高一点点 后记:关于selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD//默认512 有一个方法设置