上文中分析了ServerBootstrap的初始化流程,其实也就是NioServerSockerChannel的轮询线程reactor线程的初始化过程。本节咱们聊一聊客户端也就是NioSocketChannel的接入方式。
在上文Bootstrap初始化流程分析中我们知道,在NioServerSocketChannel进行register时,会调用eventLoop.execute方法:
//这里是传入的eventLoopAbstractChannel.this.eventLoop = eventLoop;//如果是内部线程if (eventLoop.inEventLoop()) { register0(promise);} else { try { //如果不是内部线程 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
由于调用方法的线程是进行初始化操作的main线程,所以会进入eventLoop.execute方法,调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); }
boolean inEventLoop = inEventLoop(); if (inEventLoop) { //内部线程在往任务队列中添加任务时执行addTask addTask(task); } else { //外部线程在往任务队列里面添加任务的时候执行 startThread() , // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务 startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
在startThread方法中,会调用doStartThread方法,然后调用eventLoop线程的run方法:
private void startThread() { //检查下线程是否已经启动 if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } }
/** * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法, * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行 */ private void doStartThread() { assert thread == null; System.out.println("do start Thread:" + executor); //executor 默认是ThreadPerTaskExecutor //该线程就是executor创建,对应netty的reactor线程实体 //默认情况下,ThreadPerTaskExecutor 在每次执行execute // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体 executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { //调用的是io.netty.channel.nio.NioEventLoop.run SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); ............
io.netty.channel.nio.NioEventLoop#run:
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件 //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前, // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { //轮询结束,对wakenUp进行重置,方便下一次轮询 selector.wakeup(); } default: // fallthrough }
cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //关于ioRatio见: https://github.com/netty/netty/issues/6058 //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关 if (ioRatio == 100) { try { //处理产生网络IO事件的channel processSelectedKeys(); } finally { // Ensure we always run tasks. //处理任务队列 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } .........
ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关
每次轮询之后都会进入到processSelectedKeys方法:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
processSelectedKeysOptimized方法:
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;
final Object a = k.attachment(); //在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用 if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //是否需要再一次select if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } }
在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用。接着看io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法的关键部分:
int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect();}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进入的是 unsafe.read();}
在reactor线程初始化完成后readyOps的值是OP_ACCEPT的值。当有新连接接入时,会进入unsafe.read()方法,毫无疑问,这个unsafe指的是NioMessageUnsafe,关于这点在之前的几篇文章中都有提到。
我们来看read方法,io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read:
try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; }
allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; }
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear();
我们一步步来看,首先doReadMessages(readBuf):
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel());
try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }
return 0; }
new NioSocketChannel并将channel放入buf中,其中buf的结构是一个List,需要注意的一条是,在初始化NioSocketChannel时建立的unsafe为NioSocketChannelUnsafe:
io.netty.channel.socket.nio.NioSocketChannel#newUnsafe
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
对列表执行pipeline.fireChannelRead(readBuf.get(i)),并在成功后clear了列表。
我们看下pipeline.fireChannelRead(readBuf.get(i)),最终会调用到io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)方法:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); }}
也就是开始执行pipeline了,回想一下我们在初始化的时候是不是往ServerSocketChannel的pipeline中添加了一个ServerBootstrapAcceptor,我们看下 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead:
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; System.out.println(child); //这里把ServerBootstrap传入的childHandler添加到pipeline child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }
try { //childGroup对应的是外面传入的workerGroup //这里的child是指每个连接对应的SocketChannel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}
由上下文可以知道传入的Object msg的实际值是NioSocketChannel。在这里会将初始化时传入的用户自定义的handler放入SocketChannel的pipeline中。
我们接下来看下childGroup.register(child),它会进入到io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
从EventLoopGroup初始化那一篇的分析我们可以知道,next()方法是通过chooser从初始化的workerGroup的EventLoop列表中随机选一个。进入io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):
@Override public ChannelFuture register(Channel channel) { //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor return register(new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //注意,这里往下面的register(EventLoop eventLoop, final ChannelPromise promise)方法中传入的eventLoop是this,也就是 promise.channel().unsafe().register(this, promise); return promise; }
这里需要注意的是register方法传入的eventLoop是在workerGroup的EventLoop数组中随机选取的一个。
同时由上文我们知道这个unsafe为NioSocketChannelUnsafe,它的register方法调用的是父类中的io.netty.channel.AbstractChannel.AbstractUnsafe#register方法,与NioServerSocketChannel相同。
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } //查看一下当前channel是不是已经注册过了 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //这里是传入的eventLoop AbstractChannel.this.eventLoop = eventLoop; //如果是内部线程 if (eventLoop.inEventLoop()) { register0(promise); } else { try { //如果不是内部线程 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }}
这里又会进入eventLoop.inEventLoop()判断,这里客户端SocketChannel的注册是在reactor线程即bossEventLoopGroup中进行的,而register方法传入的是 workerGroup初始化时的EventLoop列表中随机选择的一个EventLoop,它和reactor线程的eventLoop明显不同。所以它会进入eventLoop.execute(new Runnable())方法。
之前也提到过eventLoop.execute(new Runnable()方法,这里再简单分析一下,它调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); }
boolean inEventLoop = inEventLoop(); if (inEventLoop) { //内部线程在往任务队列中添加任务时执行addTask addTask(task); } else { //外部线程在往任务队列里面添加任务的时候执行 startThread() , // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务 //worker线程也是一样 startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
我们进入看一下io.netty.util.concurrent.SingleThreadEventExecutor#startThread:
private void startThread() { //检查下线程是否已经启动 if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } }
会先检查当前EventLoop线程是否已经启动,这是因为workGroup初始化时初始化的是一个线程组,然后里面会初始化一些线程,如果没有指定线程个数,会初始化cpu核数*2个线程。这些线程是要重复使用的,是池化的线程。所以如果是已经启动,就不需要再doStartThread()去启动线程了。io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread:
private void doStartThread() { assert thread == null; System.out.println("do start Thread:" + executor); //executor 默认是ThreadPerTaskExecutor //对应的workerGroup也有这样一个executor executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { //调用的是io.netty.channel.nio.NioEventLoop.run System.out.println("thread is:" + Thread.currentThread() + " SingleThreadEventExecutor.this is: " + SingleThreadEventExecutor.this); SingleThreadEventExecutor.this.run(); success = true; ...
SingleThreadEventExecutor.this.run()就是调用NioEventLoop的run方法。这个run方法是一个死循环。关于run方法我们在下文会进行分析。
这时我们再回过头来看register0(promise),实际调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } //默认值是true boolean firstRegistration = neverRegistered; //进行注册 doRegister(); //注册结束后设为false neverRegistered = false; //将状态改成已注册 registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 添加用户添加的PendingHandlerAddedTask pipeline.invokeHandlerAddedIfNeeded(); //确保promise设置成功 safeSetSuccess(promise); //回调头节点的channelRegistered方法 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。 // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive() if (isActive()) { ////如果是首次注册,发起 pipeline 的 fireChannelActive if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //这种情况会出现在先deregister然后再register时 // ChannelFuture cf = ctx.deregister(); // cf.addListener(new ChannelFutureListener() { // @Override // public void operationComplete(ChannelFuture channelFuture) throws Exception { // eventLoop.register(channelFuture.channel()).addListener(completeHandler); // } // }); beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}
其他的在这里不多说明了,我们看下doRegister()方法,io.netty.channel.nio.AbstractNioChannel#doRegister:
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //这里注册时 ops 设置的是 0,也就是说 SocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); System.out.println("javaChennel:" + javaChannel() + " selectionKey:" + selectionKey.readyOps()); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
向eventLoop().unwrappedSelector()上注册SocketChannel,并设置ops为0(目的是方便通过SelectionKey.interestOps进行设置),并将当前SocketChannel作为attachment传入。其中eventLoop().unwrappedSelector()得到的是当前EventLoop独有的 Selector对象,关于这点可以看下NioEventLoop的构造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { //executor为ThreadPerTaskExecutor super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; System.out.println("NioEventLoop is:" + this + " selector is:" + selector); unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
每个NioEventLoop都会打开一个selector,在初始化workerGroup时,workerGroup中维护着多个NioEventLoop,然后每个SocketChannel都会与其中一个NioEventLoop的selector完成注册关系。其中NioEventLoop的启动 是通过那个EventLoopGroup的executor对象来完成的,NioEventGroup的executor是ThreadPerTaskExecutor,bossGroup对应一个ThreadPerTaskExecutor对象,workerGroup对应另一个ThreadPerTaskExecutor对象。ThreadPerTaskExecutor对象启动EventLoop线程的位置在上面已经讲过的io.netty.util.concurrent.SingleThreadEventExecutor#startThread。
SingleThreadEventExecutor的startThread方法会先判断这个NioEventLoop线程有没有启动,如果已经启动就直接返回,如果没有启动,会进行doStartThread操作:
我们进入SingleThreadEventExecutor.this.run()方法,也就是io.netty.channel.nio.NioEventLoop#run:
@Override protected void run() { for (;;) { try { System.out.println("nioEventLoop :" + Thread.currentThread() + " this is:" + this); switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件 //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前, // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看 select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup(); } default: // fallthrough }
cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //关于ioRatio见: https://github.com/netty/netty/issues/6058 //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关 if (ioRatio == 100) { try { //处理产生网络IO事件的channel processSelectedKeys(); } finally { // Ensure we always run tasks. //处理任务队列 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
这里我们主要分析一下select(wakenUp.getAndSet(false))方法和processSelectedKeys()方法,关于runAllTasks()和runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法将在之后的netty的线程和任务中专门分析。
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //定时任务截止事时间快到了,中断本次轮询 //我们可以看到,NioEventLoop中reactor线程的select操作也是一个for循环,在for循环第一步中, // 如果发现当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。 // 此外,跳出之前如果发现目前为止还没有进行过select操作(if (selectCnt == 0)),那么就调用一次selectNow(),该方法会立即返回,不会阻塞 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. //轮询过程中发现有任务加入,中断本次轮询 //如果不这么做,那么新加入的任务就会被挂起直到select操作超时,如果pipeline添加了IdleStateHandler,那么它可能挂起直到idle超时 //netty为了保证任务队列能够及时执行,在进行阻塞select操作的时候会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //阻塞式select操作 //执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms), // 于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间
//如果第一个定时任务的延迟非常长,比如一个小时,那么有没有可能线程一直阻塞在select操作,当然有可能! // But,只要在这段时间内,有新任务加入,该阻塞就会被释放
int selectedKeys = selector.select(timeoutMillis); selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } //解决jdk的nio bug // //关于该bug的描述见 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055) //The problem is around a channel that was originally registered with Selector for i/o gets closed on the server side (due to early client side exit). But the server side can know about such channel only when it does i/o (read/write) and thereby getting into an IO exception. In this case, (bug 6595055)there are times (erroneous) when server side (selector) did not know the channel is already closed (peer-reset), but continue to do the selection cycle on a key set whose associated channel is alreay closed or invalid. // Hence, selector's slect(..) keep spinging with zero return without blocking for the timeout period
//该bug会导致Selector一直空轮询,最终导致cpu 100%,nio server不可用,严格意义上来说,netty没有解决jdk的bug, // 而是通过一种方式来巧妙地避开了这个bug,具体做法如下 //netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间, // 判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos // 改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些), //如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug, // 当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
rebuildSelector(); selector = this.selector;
// Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; }
currentTimeNanos = time; }
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;
final Object a = k.attachment(); //在ServerBootStrap初始化创建NioServerSocketChannel,对NioServerSocketChannel进行注册时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用 //在创建SocketChannel,对SocketChannel进行注册时也将SocketChannel的引用放入attachment中去。 if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //是否需要再一次select if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } }
在创建SocketChannel,对SocketChannel进行注册时会将SocketChannel的引用放入attachment中去:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进入的是 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
这里我们再回过头看下NioSocketChannel的构造:
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { //读的 super(parent, ch, SelectionKey.OP_READ); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } }
throw new ChannelException("Failed to enter non-blocking mode.", e); } }
可以看出SocketChannel的readInterestOp值为SelectionKey.OPREAD。当有OPREAD事件时会进入unsafe.read(),对于SocketChannel来说对应的unsafe是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe。
分析到这里,我们就不得不把nio的demo拿过来比较一下了:
while(selector.select() > 0){ Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); SelectionKey key = null; while(it.hasNext()){
key = it.next(); /*防止下次select方法返回已处理过的通道*/ it.remove();
/*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/ try{ /*ssc通道只能对链接事件感兴趣*/ if(key.isAcceptable()){
/*accept方法会返回一个普通通道, 每个通道在内核中都对应一个socket缓冲区*/ SocketChannel sc = ssc.accept(); sc.configureBlocking(false);
当selector执行一次select()之后,如果selector.select()>0那么就可以通过selector.selectedKeys()获取到keySet。然后对keySet进行处理。
在NioEventLoop的构造方法中会调用openSelector()对selector进行初始化,在openSelector中有这样一段代码:
io.netty.channel.nio.NioEventLoop#openSelector:
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField); if (cause != null) { return cause; }
selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } });
//是SelectedSelectionKeySet对象 selectedKeys = selectedKeySet;
通过反射,将selector的selectedKeys属性和publicSelectedKeys属性修改为selectedKeySet(这样做的好处是用单数组代替hashset,简化结构的同时,线性查找selectkey),并且让selectedKeys = selectedKeySet。那也就是说在每次执行selector.select()之后,都可以通过selectedKeySet来取到selectedKeys,正如processSelectedKeysOptimized中做的那样。这样当有SelectionKey.OP_READ进入时,会 进入 unsafe.read()方法,而SocketChannel的unsafe对应的是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe,它的read方法:
@Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config);
ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; }
allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());
allocHandle.readComplete(); pipeline.fireChannelReadComplete();
if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
这里关注下pipeline.fireChannelRead(byteBuf)方法,会进入到SocketChannel的pipeline,从head到tail节点执行一遍handler的channelRead操作。以我的WebSocketServerHandler为例:
@Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } }
@Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof TextWebSocketFrame) { // Echo the frame ctx.write(frame.retain()); return; } if (frame instanceof BinaryWebSocketFrame) { // Echo the frame ctx.write(frame.retain()); } }
在建立连接后,如果客户端发送了一条消息,会进入到pipeline.fireChannelRead(byteBuf)方法,最终执行到WebSocketServerHandler的channelRead0方法,如果是TextWebSocketFrame, 会使用ctx.write(frame.retain())向客户端写一条数据,这时最终会调用到DefaultPipline的write方法:
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
最后还是通过unsafe进行write操作的,io.netty.channel.AbstractChannel.AbstractUnsafe#write:
@Overridepublic final void write(Object msg, ChannelPromise promise) { assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }
int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; }
outboundBuffer.addMessage(msg, size, promise);}
你会发现这里只是将要发送的数据放入了outboundBuffer中,并没有往外发送,这是因为在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe的read方法中:
do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; }
allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null;} while (allocHandle.continueReading());
allocHandle.readComplete();pipeline.fireChannelReadComplete();
在pipeline.fireChannelRead(byteBuf)执行完成之后,还会执行pipeline.fireChannelReadComplete(),这个方法也是从pipeline的head到tail执行,最终会执行WebSocketServerHandler的 channelReadComplete方法:
@Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }
重点在于ctx.flush()方法,它最终会调用io.netty.channel.DefaultChannelPipeline.HeadContext#flush方法:
@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush();}
io.netty.channel.AbstractChannel.AbstractUnsafe#flush方法:
@Overridepublic final void flush() { assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; }
outboundBuffer.addFlush(); flush0();}
从buffer中取出数据进行flush。
先调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#flush0:
@Overrideprotected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (isFlushPending()) { return; } super.flush0();}
private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;}
这里selectionKey.interestOps() & SelectionKey.OPWRITE中selectionKey.interestOps()值为1,SelectionKey.OPWRITE值为4,那么(selectionKey.interestOps() & SelectionKey.OP_WRITE)的值为0。
也就是说如果注册了selectionKey.interestOps为SelectionKey.OP_WRITE,那么将不会在这里进行flush,eventLoop会调用forceFlush来刷新。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法:
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}
然后调用super.flush0() io.netty.channel.AbstractChannel.AbstractUnsafe#flush0:
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; }
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; }
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; }
try { doWrite(outboundBuffer); } catch (Throwable t) {
这里doWrite方法是关键,我们跟进去io.netty.channel.socket.nio.NioSocketChannel#doWrite:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; }
// Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes);
if (!done) { // Did not write all buffers completely. //如果数据量很大,则去注册OP_WRITE事件 incompleteWrite(setOpWrite); break; } } }
最终调用的是ch.write进行写出。这里可能有个疑问,为啥正常写没有注册OPWRITE事件? 认为网络不出现阻塞情况下,一直是可以写的,一般我们不注册OPWRITE事件.一般在当Socket的缓冲区繁忙、网络因素等不可写的时候才会注册OP_WRITE事件。