前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Java NIO空轮询bug和Netty中的解决方法】

【Java NIO空轮询bug和Netty中的解决方法】

作者头像
用户5640963
发布2020-03-19 09:52:38
1.4K0
发布2020-03-19 09:52:38
举报
文章被收录于专栏:卯金刀GG

1、问题提出 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302 简单来说:epoll机制是Linux下一种高效的IO复用方式,相较于select和poll机制来说。其高效的原因是将基于事件的fd放到内核中来完成,在内核中基于红黑树+链表数据结构来实现,链表存放有事件发生的fd集合,然后在调用epoll_wait时返回给应用程序,由应用程序来处理这些fd事件。 使用IO复用,Linux下一般默认就是epoll,Java NIO在Linux下默认也是epoll机制,但是JDK中epoll的实现却是有漏洞的,其中最有名的java nio epoll bug就是即使是关注的select轮询事件返回数量为0,NIO照样不断的从select本应该阻塞的Selector.select()/Selector.select(timeout)中wake up出来,导致CPU 100%问题。 2、官方论坛的重现步骤 A DESCRIPTION OF THE PROBLEM : The NIO selector wakes up infinitely in this situation..

0. server waits for connection 1. client connects and write message 2. server accepts and register OP_READ 3. server reads message and remove OP_READ from interest op set 4. client close the connection 5. server write message (without any reading.. surely OP_READ is not set) 6. server's select wakes up infinitely with return value 0 3、重现代码

代码语言:javascript
复制
---------- BEGIN SOURCE ----------
TestServer.java

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
import java.util.logging.*;

public class TestServer {
    private static final long SLEEP_PERIOD = 5000L; // 5 seconds
    private static final int BUFFER_SIZE = 8192;
    private int port;

    public TestServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Throwable {
	if (args.length < 1) {
	    System.err.println("Usage : java TestServer <port>");
	    System.exit(0);
	}

        new TestServer(Integer.parseInt(args[0])).start();
    }

    public void start() throws Throwable {
	ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);

        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket server = serverChannel.socket();
        server.bind(new InetSocketAddress(port));

        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        SocketChannel clientChannel = null;

        System.out.println("0. SERVER STARTED TO LISTEN");
        boolean writeNow = false;

	while (true) {
	    try {
		// wait for selection
		int numKeys = selector.select();

                if (numKeys == 0) {
                    System.err.println("select wakes up with zero!!!");
                }

		Iterator it = selector.selectedKeys().iterator();
		while (it.hasNext()) {
		    SelectionKey selected = (SelectionKey) it.next();
		    int ops = selected.interestOps();

                    try {
                        // process new connection
                        if ((ops & SelectionKey.OP_ACCEPT) != 0) {
                            clientChannel = serverChannel.accept();
                            clientChannel.configureBlocking(false);

                            // register channel to selector
                            clientChannel.register(selector, SelectionKey.OP_READ, null);
                            System.out.println("2. SERVER ACCEPTED AND REGISTER READ OP : client - " + clientChannel.socket().getInetAddress());
                        }

                        if ((ops & SelectionKey.OP_READ) != 0) {
                            // read client message
                            System.out.println("3. SERVER READ DATA FROM client - " + clientChannel.socket().getInetAddress());
                            readClient((SocketChannel) selected.channel(), buffer);

                            // deregister OP_READ
                            System.out.println("PREV SET : " + selected.interestOps());
                            selected.interestOps(selected.interestOps() & ~SelectionKey.OP_READ);
                            System.out.println("NEW SET : " + selected.interestOps());

                            Thread.sleep(SLEEP_PERIOD * 2);
                            new WriterThread(clientChannel).start();
                        }

                    } finally {
                        // remove from selected key set
                        it.remove();
                    }
		}
	    } catch (IOException e) {
		System.err.println("IO Error : " + e.getMessage());
	    }
	}
    }


    public void readClient(SocketChannel channel, ByteBuffer buffer) throws IOException {
        try {
            buffer.clear();

            int nRead = channel.read(buffer);

            if (nRead < 0) {
                channel.close();
                return;
            }

            if (buffer.position() != 0) {
                int size = buffer.position();
                buffer.flip();
                byte[] bytes = new byte[size];
                buffer.get(bytes);
                System.out.println("RECVED : " + new String(bytes));
            }
        } catch (IOException e) {
            System.err.println("IO Error : " + e.getMessage());
            channel.close();
        }
    }

    static class WriterThread extends Thread {
        private SocketChannel clientChannel;
        public WriterThread(SocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }

        public void run() {
            try {
                writeClient(clientChannel);
                System.out.println("5. SERVER WRITE DATA TO client - " + clientChannel.socket().getInetAddress());
            } catch (IOException e) {
                System.err.println("5. SERVER WRITE DATA FAILED : " + e);
            }
        }

        public void writeClient(SocketChannel channel) throws IOException {
            try {
                ByteBuffer buffer = ByteBuffer.wrap("zwxydfdssdfsd".getBytes());
                int total = buffer.limit();

                int totalWrote = 0;
                int nWrote = 0;

                while ((nWrote = channel.write(buffer)) >= 0) {
                    totalWrote += nWrote;
                    if (totalWrote == total) {
                        break;
                    }
                }
            } catch (IOException e) {
                System.err.println("IO Error : " + e.getMessage());
                channel.close();
            }
        }


    }
}


TestClient.java

import java.util.logging.*;
import java.io.*;
import java.net.*;

public class TestClient {
    private static final long SLEEP_PERIOD = 5000L; // 5 seconds
    private String host;
    private int port;

    public TestClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Throwable {
	if (args.length < 2 || args[0].equals("127.0.0.1") || args[0].equals("localhost")) {
	    System.err.println("Usage : java TestClient <host name> <port> (host name should not be localhost)");
	    System.exit(0);
	}

        new TestClient(args[0], Integer.parseInt(args[1])).start();
    }

    public void start() throws Throwable {
        Socket socket = new Socket(host, port);

	BufferedReader in = new BufferedReader(
	    new InputStreamReader(socket.getInputStream()));
	PrintWriter out = new PrintWriter(
	    new OutputStreamWriter(socket.getOutputStream()),
	    true /* auto flush */);

        out.println("abcdef");

        System.out.println("1. CLIENT CONNECTED AND WROTE MESSAGE");

        Thread.sleep(SLEEP_PERIOD);

//         socket.shutdownOutput();
        socket.close();

        System.out.println("4. CLIENT SHUTDOWN OUTPUT");

        Thread.sleep(SLEEP_PERIOD * 3);
    }
}


---------- END SOURCE ----------

4、netty解决方案rebuildSelector源码

代码语言:javascript
复制
private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            //统计数量
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD//默认512) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
private Selector selectRebuildSelector(int selectCnt) throws IOException {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        logger.warn(
                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                selectCnt, selector);

        rebuildSelector();
        Selector selector = this.selector;

        // Select again to populate selectedKeys.
        selector.selectNow();
        return selector;
    }
private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }

                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
//关闭旧的selector            
oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }

每天提高一点点 后记:关于selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD//默认512 有一个方法设置

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档