前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty源码分析四之客户端接入与数据写出

netty源码分析四之客户端接入与数据写出

作者头像
山行AI
发布2019-10-24 14:48:52
1.1K0
发布2019-10-24 14:48:52
举报
文章被收录于专栏:山行AI山行AI

上文中分析了ServerBootstrap的初始化流程,其实也就是NioServerSockerChannel的轮询线程reactor线程的初始化过程。本节咱们聊一聊客户端也就是NioSocketChannel的接入方式。

在上文Bootstrap初始化流程分析中我们知道,在NioServerSocketChannel进行register时,会调用eventLoop.execute方法:

代码语言:javascript
复制
//这里是传入的eventLoopAbstractChannel.this.eventLoop = eventLoop;//如果是内部线程if (eventLoop.inEventLoop()) {    register0(promise);} else {    try {        //如果不是内部线程        eventLoop.execute(new Runnable() {            @Override            public void run() {                register0(promise);            }        });

由于调用方法的线程是进行初始化操作的main线程,所以会进入eventLoop.execute方法,调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

代码语言:javascript
复制
@Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            startThread();            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }

在startThread方法中,会调用doStartThread方法,然后调用eventLoop线程的run方法:

代码语言:javascript
复制
   private void startThread() {        //检查下线程是否已经启动        if (state == ST_NOT_STARTED) {            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                doStartThread();            }        }    }
     /**        * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法,        * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行        */   private void doStartThread() {       assert thread == null;       System.out.println("do start Thread:" + executor);       //executor 默认是ThreadPerTaskExecutor       //该线程就是executor创建,对应netty的reactor线程实体       //默认情况下,ThreadPerTaskExecutor 在每次执行execute       // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体       executor.execute(new Runnable() {           @Override           public void run() {               thread = Thread.currentThread();               if (interrupted) {                   thread.interrupt();               }
               boolean success = false;               updateLastExecutionTime();               try {                   //调用的是io.netty.channel.nio.NioEventLoop.run                   SingleThreadEventExecutor.this.run();                   success = true;               } catch (Throwable t) {                   logger.warn("Unexpected exception from an event executor: ", t);         ............           

io.netty.channel.nio.NioEventLoop#run:

代码语言:javascript
复制
 @Override    protected void run() {        for (;;) {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));                        if (wakenUp.get()) {                           //轮询结束,对wakenUp进行重置,方便下一次轮询                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                //关于ioRatio见: https://github.com/netty/netty/issues/6058                //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }      .........

ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关

每次轮询之后都会进入到processSelectedKeys方法:

代码语言:javascript
复制
private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

processSelectedKeysOptimized方法:

代码语言:javascript
复制
 private void processSelectedKeysOptimized() {        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // null out entry in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.keys[i] = null;
            final Object a = k.attachment();            //在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //是否需要再一次select            if (needsToSelectAgain) {                // null out entries in the array to allow to have it GC'ed once the Channel close                // See https://github.com/netty/netty/issues/2363                selectedKeys.reset(i + 1);
                selectAgain();                i = -1;            }        }    }

在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用。接着看io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法的关键部分:

代码语言:javascript
复制
 int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking    // See https://github.com/netty/netty/issues/924    int ops = k.interestOps();    ops &= ~SelectionKey.OP_CONNECT;    k.interestOps(ops);
    unsafe.finishConnect();}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write    ch.unsafe().forceFlush();}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {    //进入的是    unsafe.read();}

在reactor线程初始化完成后readyOps的值是OP_ACCEPT的值。当有新连接接入时,会进入unsafe.read()方法,毫无疑问,这个unsafe指的是NioMessageUnsafe,关于这点在之前的几篇文章中都有提到。

我们来看read方法,io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read:

代码语言:javascript
复制
 try {        do {            int localRead = doReadMessages(readBuf);            if (localRead == 0) {                break;            }            if (localRead < 0) {                closed = true;                break;            }
            allocHandle.incMessagesRead(localRead);        } while (allocHandle.continueReading());    } catch (Throwable t) {        exception = t;    }
    int size = readBuf.size();    for (int i = 0; i < size; i ++) {        readPending = false;        pipeline.fireChannelRead(readBuf.get(i));    }    readBuf.clear();

我们一步步来看,首先doReadMessages(readBuf):

代码语言:javascript
复制
 @Override    protected int doReadMessages(List<Object> buf) throws Exception {        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {            if (ch != null) {                buf.add(new NioSocketChannel(this, ch));                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }
        return 0;    }

new NioSocketChannel并将channel放入buf中,其中buf的结构是一个List,需要注意的一条是,在初始化NioSocketChannel时建立的unsafe为NioSocketChannelUnsafe:

代码语言:javascript
复制
io.netty.channel.socket.nio.NioSocketChannel#newUnsafe
 @Override    protected AbstractNioUnsafe newUnsafe() {        return new NioSocketChannelUnsafe();    }

对列表执行pipeline.fireChannelRead(readBuf.get(i)),并在成功后clear了列表。

我们看下pipeline.fireChannelRead(readBuf.get(i)),最终会调用到io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)方法:

代码语言:javascript
复制
private void invokeChannelRead(Object msg) {    if (invokeHandler()) {        try {            ((ChannelInboundHandler) handler()).channelRead(this, msg);        } catch (Throwable t) {            notifyHandlerException(t);        }    } else {        fireChannelRead(msg);    }}

也就是开始执行pipeline了,回想一下我们在初始化的时候是不是往ServerSocketChannel的pipeline中添加了一个ServerBootstrapAcceptor,我们看下 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead:

代码语言:javascript
复制
 @Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    System.out.println(child);    //这里把ServerBootstrap传入的childHandler添加到pipeline    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());    }
    try {        //childGroup对应的是外面传入的workerGroup        //这里的child是指每个连接对应的SocketChannel        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

由上下文可以知道传入的Object msg的实际值是NioSocketChannel。在这里会将初始化时传入的用户自定义的handler放入SocketChannel的pipeline中。

我们接下来看下childGroup.register(child),它会进入到io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):

代码语言:javascript
复制
  @Override    public ChannelFuture register(Channel channel) {        return next().register(channel);    }

从EventLoopGroup初始化那一篇的分析我们可以知道,next()方法是通过chooser从初始化的workerGroup的EventLoop列表中随机选一个。进入io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):

代码语言:javascript
复制
@Override    public ChannelFuture register(Channel channel) {        //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor        return register(new DefaultChannelPromise(channel, this));    }
  @Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        //注意,这里往下面的register(EventLoop eventLoop, final ChannelPromise promise)方法中传入的eventLoop是this,也就是        promise.channel().unsafe().register(this, promise);        return promise;    }    

这里需要注意的是register方法传入的eventLoop是在workerGroup的EventLoop数组中随机选取的一个。

同时由上文我们知道这个unsafe为NioSocketChannelUnsafe,它的register方法调用的是父类中的io.netty.channel.AbstractChannel.AbstractUnsafe#register方法,与NioServerSocketChannel相同。

代码语言:javascript
复制
 @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {    if (eventLoop == null) {        throw new NullPointerException("eventLoop");    }    //查看一下当前channel是不是已经注册过了    if (isRegistered()) {        promise.setFailure(new IllegalStateException("registered to an event loop already"));        return;    }    if (!isCompatible(eventLoop)) {        promise.setFailure(                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));        return;    }    //这里是传入的eventLoop    AbstractChannel.this.eventLoop = eventLoop;    //如果是内部线程    if (eventLoop.inEventLoop()) {        register0(promise);    } else {        try {            //如果不是内部线程            eventLoop.execute(new Runnable() {                @Override                public void run() {                    register0(promise);                }            });        } catch (Throwable t) {            logger.warn(                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",                    AbstractChannel.this, t);            closeForcibly();            closeFuture.setClosed();            safeSetFailure(promise, t);        }    }}

这里又会进入eventLoop.inEventLoop()判断,这里客户端SocketChannel的注册是在reactor线程即bossEventLoopGroup中进行的,而register方法传入的是 workerGroup初始化时的EventLoop列表中随机选择的一个EventLoop,它和reactor线程的eventLoop明显不同。所以它会进入eventLoop.execute(new Runnable())方法。

之前也提到过eventLoop.execute(new Runnable()方法,这里再简单分析一下,它调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

代码语言:javascript
复制
  @Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            //worker线程也是一样            startThread();            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }
  • execute方法也是在reactor线程中执行的,所以这里会进入到下面的 startThread()方法,在 startThread()方法中会进入当前worker EventLoop的线程启动工作。
  • 当前worker EventLoop线程启动之后,将执行registor0的Runnable任务放入到EventLoop的队列中。由于启动EventLoop线程后,EventLoop的run方法是一个死循环,它会在发现队列中有任务时将任务取出并执行。

我们进入看一下io.netty.util.concurrent.SingleThreadEventExecutor#startThread:

代码语言:javascript
复制
private void startThread() {        //检查下线程是否已经启动        if (state == ST_NOT_STARTED) {            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                doStartThread();            }        }    }

会先检查当前EventLoop线程是否已经启动,这是因为workGroup初始化时初始化的是一个线程组,然后里面会初始化一些线程,如果没有指定线程个数,会初始化cpu核数*2个线程。这些线程是要重复使用的,是池化的线程。所以如果是已经启动,就不需要再doStartThread()去启动线程了。io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread:

代码语言:javascript
复制
private void doStartThread() {    assert thread == null;    System.out.println("do start Thread:" + executor);    //executor 默认是ThreadPerTaskExecutor    //对应的workerGroup也有这样一个executor    executor.execute(new Runnable() {        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }
            boolean success = false;            updateLastExecutionTime();            try {                //调用的是io.netty.channel.nio.NioEventLoop.run                System.out.println("thread is:" + Thread.currentThread() + "    SingleThreadEventExecutor.this is: " + SingleThreadEventExecutor.this);                SingleThreadEventExecutor.this.run();                success = true;                ...

SingleThreadEventExecutor.this.run()就是调用NioEventLoop的run方法。这个run方法是一个死循环。关于run方法我们在下文会进行分析。

这时我们再回过头来看register0(promise),实际调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法:

代码语言:javascript
复制
private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        //默认值是true        boolean firstRegistration = neverRegistered;        //进行注册        doRegister();        //注册结束后设为false        neverRegistered = false;        //将状态改成已注册        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        // 添加用户添加的PendingHandlerAddedTask        pipeline.invokeHandlerAddedIfNeeded();        //确保promise设置成功        safeSetSuccess(promise);        //回调头节点的channelRegistered方法        pipeline.fireChannelRegistered();        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。        // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()        if (isActive()) {            ////如果是首次注册,发起 pipeline 的 fireChannelActive            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                //这种情况会出现在先deregister然后再register时                // ChannelFuture cf = ctx.deregister();                //        cf.addListener(new ChannelFutureListener() {                //            @Override                //            public void operationComplete(ChannelFuture channelFuture) throws Exception {                //                eventLoop.register(channelFuture.channel()).addListener(completeHandler);                //            }                //        });                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

其他的在这里不多说明了,我们看下doRegister()方法,io.netty.channel.nio.AbstractNioChannel#doRegister:

代码语言:javascript
复制
@Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                //这里注册时 ops 设置的是 0,也就是说 SocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                System.out.println("javaChennel:" + javaChannel() + "   selectionKey:" + selectionKey.readyOps());                return;            } catch (CancelledKeyException e) {                if (!selected) {                    // Force the Selector to select now as the "canceled" SelectionKey may still be                    // cached and not removed because no Select.select(..) operation was called yet.                    eventLoop().selectNow();                    selected = true;                } else {                    // We forced a select operation on the selector before but the SelectionKey is still cached                    // for whatever reason. JDK bug ?                    throw e;                }            }        }    }

向eventLoop().unwrappedSelector()上注册SocketChannel,并设置ops为0(目的是方便通过SelectionKey.interestOps进行设置),并将当前SocketChannel作为attachment传入。其中eventLoop().unwrappedSelector()得到的是当前EventLoop独有的 Selector对象,关于这点可以看下NioEventLoop的构造方法:

代码语言:javascript
复制
 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {        //executor为ThreadPerTaskExecutor        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        if (strategy == null) {            throw new NullPointerException("selectStrategy");        }        provider = selectorProvider;        final SelectorTuple selectorTuple = openSelector();        selector = selectorTuple.selector;        System.out.println("NioEventLoop is:" + this + "   selector is:" + selector);        unwrappedSelector = selectorTuple.unwrappedSelector;        selectStrategy = strategy;    }

每个NioEventLoop都会打开一个selector,在初始化workerGroup时,workerGroup中维护着多个NioEventLoop,然后每个SocketChannel都会与其中一个NioEventLoop的selector完成注册关系。其中NioEventLoop的启动 是通过那个EventLoopGroup的executor对象来完成的,NioEventGroup的executor是ThreadPerTaskExecutor,bossGroup对应一个ThreadPerTaskExecutor对象,workerGroup对应另一个ThreadPerTaskExecutor对象。ThreadPerTaskExecutor对象启动EventLoop线程的位置在上面已经讲过的io.netty.util.concurrent.SingleThreadEventExecutor#startThread。

SingleThreadEventExecutor的startThread方法会先判断这个NioEventLoop线程有没有启动,如果已经启动就直接返回,如果没有启动,会进行doStartThread操作:

我们进入SingleThreadEventExecutor.this.run()方法,也就是io.netty.channel.nio.NioEventLoop#run:

代码语言:javascript
复制
 @Override    protected void run() {        for (;;) {            try {                System.out.println("nioEventLoop :" + Thread.currentThread() + " this is:" + this);                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                //关于ioRatio见: https://github.com/netty/netty/issues/6058                //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }            } catch (Throwable t) {                handleLoopException(t);            }            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                handleLoopException(t);            }        }    }

这里我们主要分析一下select(wakenUp.getAndSet(false))方法和processSelectedKeys()方法,关于runAllTasks()和runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法将在之后的netty的线程和任务中专门分析。

select(wakenUp.getAndSet(false))

代码语言:javascript
复制
 private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        try {            int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);            for (;;) {                //定时任务截止事时间快到了,中断本次轮询                //我们可以看到,NioEventLoop中reactor线程的select操作也是一个for循环,在for循环第一步中,                // 如果发现当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。                // 此外,跳出之前如果发现目前为止还没有进行过select操作(if (selectCnt == 0)),那么就调用一次selectNow(),该方法会立即返回,不会阻塞                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {                    if (selectCnt == 0) {                        selector.selectNow();                        selectCnt = 1;                    }                    break;                }
                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call                // Selector#wakeup. So we need to check task queue again before executing select operation.                // If we don't, the task might be pended until select operation was timed out.                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.                //轮询过程中发现有任务加入,中断本次轮询                //如果不这么做,那么新加入的任务就会被挂起直到select操作超时,如果pipeline添加了IdleStateHandler,那么它可能挂起直到idle超时                //netty为了保证任务队列能够及时执行,在进行阻塞select操作的时候会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环                if (hasTasks() && wakenUp.compareAndSet(false, true)) {                    selector.selectNow();                    selectCnt = 1;                    break;                }                //阻塞式select操作                //执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),                // 于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间
                //如果第一个定时任务的延迟非常长,比如一个小时,那么有没有可能线程一直阻塞在select操作,当然有可能!                // But,只要在这段时间内,有新任务加入,该阻塞就会被释放
                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                    // - Selected something,                    // - waken up by user, or                    // - the task queue has a pending task.                    // - a scheduled task is ready for processing                    break;                }                if (Thread.interrupted()) {                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.                    // As this is most likely a bug in the handler of the user or it's client library we will                    // also log it.                    //                    // See https://github.com/netty/netty/issues/2426                    if (logger.isDebugEnabled()) {                        logger.debug("Selector.select() returned prematurely because " +                                "Thread.currentThread().interrupt() was called. Use " +                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");                    }                    selectCnt = 1;                    break;                }                //解决jdk的nio bug                //                //关于该bug的描述见 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055)                //The problem is around a channel that was originally registered with Selector for i/o gets closed on the server side (due to early client side exit).  But the server side can know about such channel only when it does i/o (read/write) and thereby getting into an IO exception. In this case, (bug 6595055)there are times (erroneous) when server side (selector) did not know the channel is already closed (peer-reset), but continue to do the selection cycle on a key set whose associated channel is alreay closed or invalid.                // Hence, selector's slect(..) keep spinging with zero return without blocking for the timeout period
                //该bug会导致Selector一直空轮询,最终导致cpu 100%,nio server不可用,严格意义上来说,netty没有解决jdk的bug,                // 而是通过一种方式来巧妙地避开了这个bug,具体做法如下                //netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,                // 判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos                // 改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些),                //如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,                // 当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector                long time = System.nanoTime();                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                    // timeoutMillis elapsed without anything selected.                    selectCnt = 1;                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                    // The selector returned prematurely many times in a row.                    // Rebuild the selector to work around the problem.                    logger.warn(                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",                            selectCnt, selector);
                    rebuildSelector();                    selector = this.selector;
                    // Select again to populate selectedKeys.                    selector.selectNow();                    selectCnt = 1;                    break;                }
                currentTimeNanos = time;            }
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                            selectCnt - 1, selector);                }            }        } catch (CancelledKeyException e) {            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);            }            // Harmless exception - log anyway        }    }

processSelectedKeys()

代码语言:javascript
复制
private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }
private void processSelectedKeysOptimized() {        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // null out entry in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.keys[i] = null;
            final Object a = k.attachment();            //在ServerBootStrap初始化创建NioServerSocketChannel,对NioServerSocketChannel进行注册时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用            //在创建SocketChannel,对SocketChannel进行注册时也将SocketChannel的引用放入attachment中去。            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //是否需要再一次select            if (needsToSelectAgain) {                // null out entries in the array to allow to have it GC'ed once the Channel close                // See https://github.com/netty/netty/issues/2363                selectedKeys.reset(i + 1);
                selectAgain();                i = -1;            }        }    }    

在创建SocketChannel,对SocketChannel进行注册时会将SocketChannel的引用放入attachment中去:

代码语言:javascript
复制
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            final EventLoop eventLoop;            try {                eventLoop = ch.eventLoop();            } catch (Throwable ignored) {                // If the channel implementation throws an exception because there is no event loop, we ignore this                // because we are only trying to determine if ch is registered to this event loop and thus has authority                // to close ch.                return;            }            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is            // still healthy and should not be closed.            // See https://github.com/netty/netty/issues/5125            if (eventLoop != this || eventLoop == null) {                return;            }            // close the channel if the key is not valid anymore            unsafe.close(unsafe.voidPromise());            return;        }
        try {            int readyOps = k.readyOps();            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise            // the NIO JDK channel implementation may throw a NotYetConnectedException.            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking                // See https://github.com/netty/netty/issues/924                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);
                unsafe.finishConnect();            }
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write                ch.unsafe().forceFlush();            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead            // to a spin loop            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                //进入的是                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }

这里我们再回过头看下NioSocketChannel的构造:

代码语言:javascript
复制
 public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        config = new NioSocketChannelConfig(this, socket.socket());    }
  protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {          //读的          super(parent, ch, SelectionKey.OP_READ);      }  protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {          super(parent);          this.ch = ch;          this.readInterestOp = readInterestOp;          try {              ch.configureBlocking(false);          } catch (IOException e) {              try {                  ch.close();              } catch (IOException e2) {                  if (logger.isWarnEnabled()) {                      logger.warn(                              "Failed to close a partially initialized socket.", e2);                  }              }
              throw new ChannelException("Failed to enter non-blocking mode.", e);          }      }      

可以看出SocketChannel的readInterestOp值为SelectionKey.OPREAD。当有OPREAD事件时会进入unsafe.read(),对于SocketChannel来说对应的unsafe是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe。

分析

分析到这里,我们就不得不把nio的demo拿过来比较一下了:

代码语言:javascript
复制
  while(selector.select() > 0){     Set<SelectionKey> keySet = selector.selectedKeys();     Iterator<SelectionKey> it = keySet.iterator();     SelectionKey key = null;     while(it.hasNext()){
         key = it.next();         /*防止下次select方法返回已处理过的通道*/         it.remove();
         /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/         try{             /*ssc通道只能对链接事件感兴趣*/             if(key.isAcceptable()){
                 /*accept方法会返回一个普通通道,                      每个通道在内核中都对应一个socket缓冲区*/                 SocketChannel sc = ssc.accept();                 sc.configureBlocking(false);

当selector执行一次select()之后,如果selector.select()>0那么就可以通过selector.selectedKeys()获取到keySet。然后对keySet进行处理。

在NioEventLoop的构造方法中会调用openSelector()对selector进行初始化,在openSelector中有这样一段代码:

代码语言:javascript
复制
io.netty.channel.nio.NioEventLoop#openSelector:
 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                try {                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);                    if (cause != null) {                        return cause;                    }                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);                    if (cause != null) {                        return cause;                    }
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);                    return null;                } catch (NoSuchFieldException e) {                    return e;                } catch (IllegalAccessException e) {                    return e;                }            }        });
  //是SelectedSelectionKeySet对象         selectedKeys = selectedKeySet;       

通过反射,将selector的selectedKeys属性和publicSelectedKeys属性修改为selectedKeySet(这样做的好处是用单数组代替hashset,简化结构的同时,线性查找selectkey),并且让selectedKeys = selectedKeySet。那也就是说在每次执行selector.select()之后,都可以通过selectedKeySet来取到selectedKeys,正如processSelectedKeysOptimized中做的那样。这样当有SelectionKey.OP_READ进入时,会 进入 unsafe.read()方法,而SocketChannel的unsafe对应的是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe,它的read方法:

代码语言:javascript
复制
  @Override        public final void read() {            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            allocHandle.reset(config);
            ByteBuf byteBuf = null;            boolean close = false;            try {                do {                    byteBuf = allocHandle.allocate(allocator);                    allocHandle.lastBytesRead(doReadBytes(byteBuf));                    if (allocHandle.lastBytesRead() <= 0) {                        // nothing was read. release the buffer.                        byteBuf.release();                        byteBuf = null;                        close = allocHandle.lastBytesRead() < 0;                        break;                    }
                    allocHandle.incMessagesRead(1);                    readPending = false;                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;                } while (allocHandle.continueReading());
                allocHandle.readComplete();                pipeline.fireChannelReadComplete();
                if (close) {                    closeOnRead(pipeline);                }            } catch (Throwable t) {                handleReadException(pipeline, byteBuf, t, close, allocHandle);            } finally {                // Check if there is a readPending which was not processed yet.                // This could be for two reasons:                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method                //                // See https://github.com/netty/netty/issues/2254                if (!readPending && !config.isAutoRead()) {                    removeReadOp();                }            }        }

这里关注下pipeline.fireChannelRead(byteBuf)方法,会进入到SocketChannel的pipeline,从head到tail节点执行一遍handler的channelRead操作。以我的WebSocketServerHandler为例:

代码语言:javascript
复制
  @Override    public void channelRead0(ChannelHandlerContext ctx, Object msg) {        if (msg instanceof FullHttpRequest) {            handleHttpRequest(ctx, (FullHttpRequest) msg);        } else if (msg instanceof WebSocketFrame) {            handleWebSocketFrame(ctx, (WebSocketFrame) msg);        }    }
    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            // Check for closing frame            if (frame instanceof CloseWebSocketFrame) {                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());                return;            }            if (frame instanceof PingWebSocketFrame) {                ctx.write(new PongWebSocketFrame(frame.content().retain()));                return;            }            if (frame instanceof TextWebSocketFrame) {                // Echo the frame                ctx.write(frame.retain());                return;            }            if (frame instanceof BinaryWebSocketFrame) {                // Echo the frame                ctx.write(frame.retain());            }        }

在建立连接后,如果客户端发送了一条消息,会进入到pipeline.fireChannelRead(byteBuf)方法,最终执行到WebSocketServerHandler的channelRead0方法,如果是TextWebSocketFrame, 会使用ctx.write(frame.retain())向客户端写一条数据,这时最终会调用到DefaultPipline的write方法:

代码语言:javascript
复制
  @Override        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {            unsafe.write(msg, promise);        }

最后还是通过unsafe进行write操作的,io.netty.channel.AbstractChannel.AbstractUnsafe#write:

代码语言:javascript
复制
 @Overridepublic final void write(Object msg, ChannelPromise promise) {    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        // If the outboundBuffer is null we know the channel was closed and so        // need to fail the future right away. If it is not null the handling of the rest        // will be done in flush0()        // See https://github.com/netty/netty/issues/2362        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);        // release message now to prevent resource-leak        ReferenceCountUtil.release(msg);        return;    }
    int size;    try {        msg = filterOutboundMessage(msg);        size = pipeline.estimatorHandle().size(msg);        if (size < 0) {            size = 0;        }    } catch (Throwable t) {        safeSetFailure(promise, t);        ReferenceCountUtil.release(msg);        return;    }
    outboundBuffer.addMessage(msg, size, promise);}

你会发现这里只是将要发送的数据放入了outboundBuffer中,并没有往外发送,这是因为在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe的read方法中:

代码语言:javascript
复制
 do {    byteBuf = allocHandle.allocate(allocator);    allocHandle.lastBytesRead(doReadBytes(byteBuf));    if (allocHandle.lastBytesRead() <= 0) {        // nothing was read. release the buffer.        byteBuf.release();        byteBuf = null;        close = allocHandle.lastBytesRead() < 0;        break;    }
    allocHandle.incMessagesRead(1);    readPending = false;    pipeline.fireChannelRead(byteBuf);    byteBuf = null;} while (allocHandle.continueReading());
allocHandle.readComplete();pipeline.fireChannelReadComplete();

在pipeline.fireChannelRead(byteBuf)执行完成之后,还会执行pipeline.fireChannelReadComplete(),这个方法也是从pipeline的head到tail执行,最终会执行WebSocketServerHandler的 channelReadComplete方法:

代码语言:javascript
复制
 @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }

重点在于ctx.flush()方法,它最终会调用io.netty.channel.DefaultChannelPipeline.HeadContext#flush方法:

代码语言:javascript
复制
 @Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {    unsafe.flush();}

io.netty.channel.AbstractChannel.AbstractUnsafe#flush方法:

代码语言:javascript
复制
 @Overridepublic final void flush() {    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }
    outboundBuffer.addFlush();    flush0();}

从buffer中取出数据进行flush。

先调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#flush0:

代码语言:javascript
复制
 @Overrideprotected final void flush0() {    // Flush immediately only when there's no pending flush.    // If there's a pending flush operation, event loop will call forceFlush() later,    // and thus there's no need to call it now.    if (isFlushPending()) {        return;    }    super.flush0();}
 private boolean isFlushPending() {    SelectionKey selectionKey = selectionKey();    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;}

这里selectionKey.interestOps() & SelectionKey.OPWRITE中selectionKey.interestOps()值为1,SelectionKey.OPWRITE值为4,那么(selectionKey.interestOps() & SelectionKey.OP_WRITE)的值为0。

也就是说如果注册了selectionKey.interestOps为SelectionKey.OP_WRITE,那么将不会在这里进行flush,eventLoop会调用forceFlush来刷新。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法:

代码语言:javascript
复制
if ((readyOps & SelectionKey.OP_WRITE) != 0) {    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write    ch.unsafe().forceFlush();}

然后调用super.flush0() io.netty.channel.AbstractChannel.AbstractUnsafe#flush0:

代码语言:javascript
复制
  protected void flush0() {    if (inFlush0) {        // Avoid re-entrance        return;    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null || outboundBuffer.isEmpty()) {        return;    }
    inFlush0 = true;
    // Mark all pending write requests as failure if the channel is inactive.    if (!isActive()) {        try {            if (isOpen()) {                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);            } else {                // Do not trigger channelWritabilityChanged because the channel is closed already.                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);            }        } finally {            inFlush0 = false;        }        return;    }
    try {        doWrite(outboundBuffer);    } catch (Throwable t) {

这里doWrite方法是关键,我们跟进去io.netty.channel.socket.nio.NioSocketChannel#doWrite:

代码语言:javascript
复制
 @Override    protected void doWrite(ChannelOutboundBuffer in) throws Exception {        for (;;) {            int size = in.size();            if (size == 0) {                // All written so clear OP_WRITE                clearOpWrite();                break;            }            long writtenBytes = 0;            boolean done = false;            boolean setOpWrite = false;
            // Ensure the pending writes are made of ByteBufs only.            ByteBuffer[] nioBuffers = in.nioBuffers();            int nioBufferCnt = in.nioBufferCount();            long expectedWrittenBytes = in.nioBufferSize();            SocketChannel ch = javaChannel();
            // Always us nioBuffers() to workaround data-corruption.            // See https://github.com/netty/netty/issues/2761            switch (nioBufferCnt) {                case 0:                    // We have something else beside ByteBuffers to write so fallback to normal writes.                    super.doWrite(in);                    return;                case 1:                    // Only one ByteBuf so use non-gathering write                    ByteBuffer nioBuffer = nioBuffers[0];                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                        final int localWrittenBytes = ch.write(nioBuffer);                        if (localWrittenBytes == 0) {                            setOpWrite = true;                            break;                        }                        expectedWrittenBytes -= localWrittenBytes;                        writtenBytes += localWrittenBytes;                        if (expectedWrittenBytes == 0) {                            done = true;                            break;                        }                    }                    break;                default:                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);                        if (localWrittenBytes == 0) {                            setOpWrite = true;                            break;                        }                        expectedWrittenBytes -= localWrittenBytes;                        writtenBytes += localWrittenBytes;                        if (expectedWrittenBytes == 0) {                            done = true;                            break;                        }                    }                    break;            }
            // Release the fully written buffers, and update the indexes of the partially written buffer.            in.removeBytes(writtenBytes);
            if (!done) {                // Did not write all buffers completely.                //如果数据量很大,则去注册OP_WRITE事件                incompleteWrite(setOpWrite);                break;            }        }    }

最终调用的是ch.write进行写出。这里可能有个疑问,为啥正常写没有注册OPWRITE事件? 认为网络不出现阻塞情况下,一直是可以写的,一般我们不注册OPWRITE事件.一般在当Socket的缓冲区繁忙、网络因素等不可写的时候才会注册OP_WRITE事件。

总结

  • 大致流程是reactor线程轮询到有ACCEPT事件后创建NioSocketChannel,然后进入pipeline,在ServerBootStrapAcceptor的channelRead事件中进行 NioSocketChannel的pipeline的添加(添加的是用户在bootstrap层传入的childHandler),然后使用workerGroup进行NioSocketChannel的注册。 注册的核心是围绕着NioEventLoop进行的,它在初始化workerGroup对应的NioEventLoopGroup时创建的了ThreadPerTaskExecutor作为该group的executor,另一方面创建了 cpu核数*2个NioEventLoop,在通过eventloop执行NioSocketChannel的注册时,使用chooser来随机选择一个eventloop来完成,会先确保NioEventLoop都已经启动,具体方式是通过executor调用eventLoop的run方法, run方法是一个死循环,会不停地进行selector的select操作,监听与该EventLoop的selector绑定的channel上的事件。另一方面NioSocketchannel的注册工作会封装成task会被eventLoop的run方法中执行,注册时与nioEventLoop中的selector属性完成了绑定。 写的操作也是通过NioSocketChannel的unsafe类是通过NioSocketChannel的doWrite方法来进行的。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • select(wakenUp.getAndSet(false))
  • processSelectedKeys()
  • 分析
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档