Bootstrap是netty的启动配置类,里面涉及到大量的netty的初始化操作,本篇来分析下它的启动流程
public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; }
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer(sslCtx));
Channel ch = bootstrap.bind(PORT).sync().channel();
System.out.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
下面我们一一来看看。
public ServerBootstrap() { }
属性主要有:
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler;
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
可以看到childGroup也就是group方法传入的workerGroup是赋值给ServerBootstrap的childGroup属性的。我们进入 super.group(parentGroup)也就是io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup):
/** * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created * {@link Channel} */ @SuppressWarnings("unchecked") public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
我们来看一下AbstractBootstrap的类概览,主要看一下属性和构造:
可以看到bossGroup是赋值给AbstractBootstrap的group属性的。
关于这个方法在netty源码分析一之NioServerSocketChannel中已经完整地介绍过,这里不再赘述,它的目的是创建一个泛型为NioServerSocketChannel的ReflectiveChannelFactory。
/** * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. */ public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }
在这里的主要作用是赋值给ServerBootstrap的属性childHandler。
先看一下bind方法:
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
继续往下看:
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
我们继续来看doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) { //初始化并进行注册 final ChannelFuture regFuture = initAndRegister(); //获取channel final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}
这个方法里面我们主要需要分析三点: initAndRegister()方法、ChannelFuture和doBind0(regFuture, channel, localAddress, promise)方法。
这个方法的主要作用是初始化和注册。我们来看具体代码io.netty.bootstrap.AbstractBootstrap#initAndRegister:
final ChannelFuture initAndRegister() { //这个channel是ServerSocketChannel Channel channel = null; try { channel = channelFactory.newChannel(); System.out.println("channel:" + channel); //1.初始化channel init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //2. register channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
return regFuture;}
这段代码主要分三步:
1. 使用channelFactory创建ServerSocketChannel2. init(channel) 初始化channel3. config().group().register(channel)初始化channel
创建channel的部分我们在分析一中已经介绍过了,接下来我们来一步步地看后面的步骤: 首先init(channel),实际调用的是io.netty.bootstrap.ServerBootstrap#init方法:
@Overridevoid init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { //给ServerSocketChannel设置一些options setChannelOptions(channel, options, logger); }
final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { //将在ServerBootstrap中携带的attr放入到channel的attr中 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipeline,这时候只有head和tail节点,还没有加入用户自定义的handler ChannelPipeline p = channel.pipeline(); //这里的childGroup是指外层传入的workerGroup final EventLoopGroup currentChildGroup = childGroup; //这个是外层传入的childHandler(new WebSocketServerInitializer(sslCtx)) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { //设置options currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { //设置attrs currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //向传入的ServerSocketChannel的pipline中添加handler,这里就像在最外层通过initializer方式添加channel handler一样,这里相当于添加了一个acceptor p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { System.out.println("channel:" + ch + " start init channel!"); //这里获取到的相当于是nio中的SocketChannel final ChannelPipeline pipeline = ch.pipeline(); //在SocketChannel中添加bootstrap.handler(),我们的示例中添加的是childHandler(new WebSocketServerInitializer(sslCtx)),所以这里为null ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //调用SocketChannel的eventLoop,向里面添加一个往pipeline中添加ServerBootstrapAcceptor的内部任务 //这个是在回调时调用的,所以是内部的eventLoop调用 ch.eventLoop().execute(new Runnable() { @Override public void run() { //这里将ServerSocketChannel和bootstrap传入的workerGroup,childHandler等都传入 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
在里面进行的是一系列的初始化操作,其中在ServerSocketChannel的pipeline中添加了一个ChannelInitializer,在ChannelInitializer内部进行的是 往每个连接对应的SocketChannel的pipeline中添加一个ServerBootstrapAcceptor。关于ServerBootstrapAcceptor我们后面再专门来讲一讲。
这里我们先来看一看config().group().register(channel),也就是ServerSocketChannel的注册部分: 我们先看一看io.netty.bootstrap.AbstractBootstrapConfig#group方法:
public final EventLoopGroup group() { //对bootstrap进行了包装 return bootstrap.group(); }
这里返回的是bootstrap.group也就是外面传入的bossGroup,即大名鼎鼎的reactor线程。这里的操作实际上是用reactor线程来注册ServerSocketChannel:
看过源码分析二的会知道,这里会进入到io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)方法:
@Override public ChannelFuture register(Channel channel) { //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor return register(new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //register方法传入的EventLoop就是当前的这个boosEventGroup对应的reactor线程 promise.channel().unsafe().register(this, promise); return promise; }
注意这里的channel对应的是NioServerSocketChannel,它的unsafe方法为io.netty.channel.nio.AbstractNioChannel#unsafe:
@Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); }
然后调用的是io.netty.channel.AbstractChannel#unsafe:
@Override public Unsafe unsafe() { return unsafe; }
这里的unsafe初始化在源码分析一中已经分析过,这里简单提一下:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
最终对应的是io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe方法:
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
register调用的也就是NioMessageUnsafe的register方法,它对应的是io.netty.channel.AbstractChannel.AbstractUnsafe的register方法,其中AbstractUnsafe是AbstractChannel的内部类,register方法为:
/** * 1. 首先通过isRegistered() 判断该 Channel 是否已经注册到 EventLoop 中; * 2. 通过 eventLoop.inEventLoop() 来判断当前线程是否为该 EventLoop 自身发起的,如果是,则调用 register0() 直接注册; * 3. 如果不是,说明该 EventLoop 中的线程此时没有执行权,则需要新建一个线程,单独封装一个 Task,而该 Task 的主要任务则是执行 register0()。 * @param eventLoop * @param promise */ @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } //查看一下当前channel是不是已经注册过了 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //这里是传入的eventLoop AbstractChannel.this.eventLoop = eventLoop; //如果是内部线程 if (eventLoop.inEventLoop()) { register0(promise); } else { try { //如果不是内部线程 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
如果是内部线程,直接执行register0方法,如果不是则进入下面的方法,这里我们先分析下eventLoop.inEventLoop()对应的是io.netty.util.concurrent.AbstractEventExecutor#inEventLoop:
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
这里的Thread.currentThread()为bind对应的线程即main线程,io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop:
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
可以看到,其实这里使用的不是内部线程,也就是说会进入下面的方法。我们先看一看eventLoop.execute(new Runnable())方法对应的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); }
boolean inEventLoop = inEventLoop(); if (inEventLoop) { //内部线程在往任务队列中添加任务时执行addTask addTask(task); } else { //外部线程在往任务队列里面添加任务的时候执行 startThread() , // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务 startThread(); //将传入的任务添加到队列中 addTask(task); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
这里会进入 startThread()和addTask(task)方法。我们这里多说几句,继续看startThread方法:
private void startThread() { //检查下线程是否已经启动 if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } }}
doStartThread方法:
/** * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法, * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行 */private void doStartThread() { assert thread == null; //executor 默认是ThreadPerTaskExecutor //该线程就是executor创建,对应netty的reactor线程实体 //默认情况下,ThreadPerTaskExecutor 在每次执行execute // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体 executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { //调用的是io.netty.channel.nio.NioEventLoop.run SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } ........
executor.execute方法调用的是:
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
这里我们主要分析一下SingleThreadEventExecutor.this.run()调用的是io.netty.channel.nio.NioEventLoop#run:
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件 //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前, // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看 select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup(); } default: // fallthrough }
cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { //处理产生网络IO事件的channel processSelectedKeys(); } finally { // Ensure we always run tasks. //处理任务队列 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
关于这个方法,我们后面会专门来讲,这里主要看一下processSelectedKeys()方法:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
最终调用的是:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }}
我们知道,NioServerSocketChannel注册的是ACCEPT事件,当有read事件进入时,会进入unsafe.read(),调用的是NioMessageUnsafe的read():
private final List<Object> readBuf = new ArrayList<Object>();
public void read() { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete();}
接下来,调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead()。
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel());
try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }
return 0; }
继续来讲关于register0方法:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } //默认值是true boolean firstRegistration = neverRegistered; //进行注册 doRegister(); //注册结束后设为false neverRegistered = false; //将状态改成已注册 registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 添加用户添加的PendingHandlerAddedTask pipeline.invokeHandlerAddedIfNeeded(); //确保promise设置成功 safeSetSuccess(promise); //回调头节点的channelRegistered方法 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。 // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive() if (isActive()) { ////如果是首次注册,发起 pipeline 的 fireChannelActive if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //如果设置了autoRead,这里就开始读取数据了。注意下ServerBootstrapAcceptor中有设置channel为autoRead true的操作 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}
也就是说如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive(),而如果是注册过的且处于激活状态则会调用beginRead方法,注册ACCEPT事件。
关于auto read,我们需要关注下ServerBootstrapAcceptor部分的操作:
接下来我们看一看doRegister方法,io.netty.channel.nio.AbstractNioChannel#doRegister方法如下:
@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作,这样做的目的是(摘自《Netty权威指南(第二版)》): // //注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。 //通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量 // selectionKey 赋值。 //由于这里 ops 设置为 0,所以还不能监听读写事件。调用 doRegister()后,然后调用pipeline.invokeHandlerAddedIfNeeded();, // 这个时候控制台会出现 loggin-handlerAdded,内部如何调用, // 我们在剖析 pipeline 时再做详细分析。然后将注册结果设置为成功(safeSetSuccess(promise))。调用 pipeline.fireChannelRegistered(); selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
这里NioServerSocketChannel向selector上注册的ops是0;
关于beginRead()方法,最终调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead,在这个方法内部调用的是io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead方法, 继续往下找,调用的是io.netty.channel.nio.AbstractNioChannel#doBeginRead:
@Overrideprotected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called //这里是nio的selectionKey final SelectionKey selectionKey = this.selectionKey; //selectionKey.isValid()默认是true,当调用了cancle方法后是false if (!selectionKey.isValid()) { return; }
readPending = true; //感兴趣的操作码 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}
在上面的doRegister中NioServerSocketChannel向selector注册的ops为0,而NioServerSocketChannel的interestOps为SelectionKey.OPACCEPT(具体可以看看关于NioServerSocketChannel的那篇源码分析), 那么在这里的计算就是interestOps & readInterestOp = 0 & SelectionKey.OPACCEPT结果为0,而interestOps | readInterestOp= 0 | OPACCEPT 结果为SelectionKey.OPACCEPT。
是jdk Future的子类,主要提供了添加一些监听器,进行同步、等待等操作的方法:
我们来看下io.netty.bootstrap.AbstractBootstrap#doBind0方法:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { //在上面的注册工作完成之后,才进行bind操作 if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}
我们看下channel的bind方法,io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):
@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise);}
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):
@Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
继续往下看,调到io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):
@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; }
final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise;}
io.netty.channel.AbstractChannelHandlerContext#invokeBind:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); }}
然后进入io.netty.channel.DefaultChannelPipeline.HeadContext#bind:
@Overridepublic void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise);}
然后进入的是NioMessageUnsafe,它的bind方法是io.netty.channel.AbstractChannel.AbstractUnsafe#bind:
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }
// See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } //绑定之后就会激活,这时候wasActive返回的应该是false,执行doBind方法之后isActive()返回的应该是true boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //在绑定之后执行pipeline.fireChannelActive方法 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); }
safeSetSuccess(promise);}
这里的isActive方法对应的是io.netty.channel.socket.nio.NioServerSocketChannel#isActive:
@Override public boolean isActive() { return javaChannel().socket().isBound(); }
调用的是jdk底层的ServerSocket的isBound方法,可见在执行doBind方法之前wasActive的值为false,执行了doBind方法之后,这个值为true,然后会执行pipeline.fireChannelActive()。
这里的doBind方法调用的是io.netty.channel.socket.nio.NioServerSocketChannel#doBind:
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
可以看出,最终还是调jdk底层的ServerSocketChannel来进行bind操作的。
关于ServerBootstrapAcceptor,我们关注一下它的channelRead方法:
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) { //关于这里为什么传入的是Channel,上面已经提到过 final Channel child = (Channel) msg; //这里把ServerBootstrap传入的childHandler添加到pipeline中 child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }
try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}
这里主要分析三点:
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
这里的next()对应的是什么呢,我们来一层层看看:
@Override public EventLoop next() { return (EventLoop) super.next(); }
然后进去:
@Override public EventExecutor next() { return chooser.next(); }
这个chooser就是我们在源码分析二中重点讲过的EventExecutorChooserFactory.EventExecutorChooser,它是在new NioEventLoopGroup(nThreads)的时候初始化的, 初始化的数组Executor[nThreads],它的next()方法是从这个Exector数组中顺序随机取一个Exector,也就是取一个EventLoop。
然后我们分析一下register方法,调用的是NioEventLoop父类的io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):
@Overridepublic ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}
可以看到最后调用的是channel的unsafe来进行注册的操作的。
服务端的启动流程到这里就分析结束了,这里我们总结一下,并对其中比较关键的几个点进行一个总结。初始化过程主要分为以下几步: 初始化NioEventLoopGroup,里面包括NioEventLoop的初始化操作。 创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等。初始化server对应的channel, 设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件。 初始化NioServerSocketChannel,并初始化unsafe(NioMessageUnsafe)和pipeline,并在pipeline中添加ServerBootstrapAcceptor等。 注册时通过selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)将NioServerSocketChannel添加到SelectionKey的attachment中。如果是第一次注册且处于激活状态则会触发pipeline的fireChannelActive()方法,否则如果设置了autoRead则会触发beginRead方法并向selector上注册SelectionKey.OP_ACCEPT。 NioEventLoop的execute方法在外部线程调用时会先启动startThread来启动一个线程,在线程中调用run()方法,而run方法是一个死循环,不停地轮询selectKey的信息,这里只是简单提一下,之后会详细分析。需要注 意的是初始化是main线程执行的,也就是说在初始化的时候就会调用startThread方法打开reactor线程,然后开始select selectKey的事件。 startThread开启的线程是通过ThreadPerTaskExecutor.execute来完成的,ThreadPerTaskExecutor 在每次执行execute方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体。 NioEventLoop的run方法中会循环轮询processSelectedKey方法,在processSelectedKey方法中监听到ACCEPT事件后会调用 unsafe.read()方法,在 unsafe.read()方法中会调用NioMessageUnsafe的read方法。 在NioMessageUnsafe的read方法中会为每个客户端创建NioSocketChannel并加入到List readBuf中,然后触发pipeline的fireChannelRead方法,最终将NioSocketChannel对象传入到 void channelRead(ChannelHandlerContext ctx, Object msg)的msg参数, 这也是为什么在ServerBootstrapAcceptor的channelRead方法第二个参数是NioSocketChannel对象。 ServerBootstrapAcceptor是reactor线程的acceptor,IO操作分发给具体channel的都是通过acceptor线程来进行的。