首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty NioEventLoop源码解读

Netty NioEventLoop源码解读

作者头像
良辰美景TT
发布2018-09-11 14:29:05
8130
发布2018-09-11 14:29:05
举报

  NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:I/O任务:即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。非IO任务:添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。以下是NioEventLoop的构造方法。

//这是NioEventLoop的构造方法,NioEventLoopGroup 是管理NioEventLoop的集合,executor则为任务执行池,SelectorProvider 用于构造Selector对象,RejectedExecutionHandler用于在executor满了执行的逻辑
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

在NioEventLoop类中,最重要的便是run方法,以下是run方法的源码:

    @Override
    protected void run() {
//这里是个死循环,里面的run方法逻辑会一直执行
        for (;;) {
            try {
//根据是队列里是否有任务执行不同的策略
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
//select内部具体的方法看下面的源码
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
//这里是I/O与executor里任务执行的比率,如果设为100则表示I/O的传先级最高
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
//处理网络I/O事件processSelectedKeys可以看下面的源码
                        processSelectedKeys();
                    } finally {
                      //最终不是会执行任务队列里的任务的
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        //传入一个执行了io的时间,根据io执行时间算出任务能够执行多长时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                      // runAllTasks看下面的源码
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

通过上面的run方法分析,内部有比较重要的三个方法, 我们下面来一一分析,其中比较重要的是select方法。里面最重要的逻辑便是判断执行selector的selectNow方法还是执行select方法。其中selectNow方法是非阻塞的,则select方法是阻塞的。同时这个方法也解决了JVM里Selector空循环的bug,解决思想是发现执行了512次select还是没有I/O事件,刚通过新建一个新的selector。具体代码如下(关键代码有相应的注释):

//这里处理的是select逻辑
    private void select(boolean oldWakenUp) throws IOException {
//这个Selector就 是 java.nio里的selector啦
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
//selectDeadLineNanos得到的是最早任务的的一个执行时间
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
//如果最早需要执行的任务时间在0.5秒内,则执行selector的sellectNow方法,selectNow方法是非阻塞的
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
//再次确认有任务则执行selectNow方法
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
 //如果没有队列里没有任务需要执行,则通过select阻塞方法,得到selectedKeys
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
//selectedKeys不为空,则说明有相应的I/O事件,跳出循环,的run方法里会处理具体的I/O事件
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
//如果最早任务的执行时间还没有相应的I/O事件,则把selectCnt置为1,重新开始内部的for循环
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//下面的逻辑用于处理java selector空轮询的bug
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

另一个比较重要的方法是processSelectedKeys方法,里面会处理具体的I/O事件,根据SelectionKey的readyops属性处理不同的I/O事件,具体代码分析如下:

//根据selectedKeys 不同,采用不同的方法处理
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
//调用selector.selectedKeys()方法得到selectedKeys
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

//这里处理的是没有优化过的SelectedKeys
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }
//循环处理SelectionKey
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
//具体的I/O事件在processSelectedKey方法里
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
//根据 readyOps 判断I/O事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//处理connect事件,
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              //处理write事件
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//处理read与accept事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

还有一个比较重要的方法便是runAllTasks,这个方法传入能够处理task的时间,在超过这个时间后会退出执行,以使线程能够执行I/O逻辑,代码逻辑如下:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
//从队列里取出执需要执行的task
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
//这个方法求出执行的最后时间
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
//执行任务
            safeExecute(task);

            runTasks ++;

//只有执行了64个任务才比较当前时间与deadline的大小,如果超过了则直接退出,这样做的原因是取得系统的nanoTime也是个相对耗时的操作
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
//继续从队列里拿出未执行的task
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

Netty是接收用户连接后是如何处理的

  下面来看看netty接收client端的请求后都做了那些事情,我们从上面的processSelectedKey方法里我们知道在处理SelectionKey.OP_ACCEPT事件的时候会调用 unsafe.read()方法,代码截图如下:

image.png

下面我们通过unsafe来一步步追踪netty是如何处理接收到的连接的,这里的unsafe对象是NioMessageUnsafe 对象,代码如下:

    private final class NioMessageUnsafe extends AbstractNioUnsafe {
//这里接收读到的buf对象
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
// config()方法得到的是关于channel的配置信息
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
//这里会调用父类的unsafe方法
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
//doReadMessages方法调用的是channel里的方法,在这里会调用NioServerSocketChannel类里的方法。通过readBuf会返回新建的SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
//这里对增加到readBuf里的对象进行处理通过pipeline对象,最终会调用到ServerBootstrapAcceptor类里的channelRead方法
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

下面来看看NioServerSocketChannel类里是如何处理doReadMessages的,主要的逻辑只是调用了java NIO的accept,然后返回SocketChannel,将返回的对象包装成Netty内部使用的对象。具体代码如下:

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
//这里会调用java NIO里channel的accept方法,返回的也是java NIO里的SocketChannel对象
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
//新建NioSocketChannel对象,传的的parent就是当前NioServerSocketChannel, SocketChannel 就是刚刚调用accpt方法返回的对象。
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

ServerBootstrapAcceptor是一个ChannelInboundHandler类,里面的核心方法如下图所示:逻辑在图里都有相应的标注:

image.png

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.07.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Netty是接收用户连接后是如何处理的
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档