netty源码分析之三Bootstrap初始化

Bootstrap是netty的启动配置类,里面涉及到大量的netty的初始化操作,本篇来分析下它的启动流程

netty启动代码

public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .childHandler(new WebSocketServerInitializer(sslCtx));
            Channel ch = bootstrap.bind(PORT).sync().channel();
            System.out.println("Open your web browser and navigate to " +                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
            ch.closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
  • ServerBootstrap主要的启动配置类
  • ServerBootstrap.bind方法主要处理一些端口绑定,channel注册等操作,是核心方法

下面我们一一来看看。

ServerBootStrap初始化

1. 构造方法:

public ServerBootstrap() { }

属性主要有:

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);    private volatile EventLoopGroup childGroup;    private volatile ChannelHandler childHandler;

2. bootstrap.group(bossGroup, workerGroup)

  /**     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and     * {@link Channel}'s.     */    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {        super.group(parentGroup);        if (childGroup == null) {            throw new NullPointerException("childGroup");        }        if (this.childGroup != null) {            throw new IllegalStateException("childGroup set already");        }        this.childGroup = childGroup;        return this;    }

可以看到childGroup也就是group方法传入的workerGroup是赋值给ServerBootstrap的childGroup属性的。我们进入 super.group(parentGroup)也就是io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup):

 /**     * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created     * {@link Channel}     */    @SuppressWarnings("unchecked")    public B group(EventLoopGroup group) {        if (group == null) {            throw new NullPointerException("group");        }        if (this.group != null) {            throw new IllegalStateException("group set already");        }        this.group = group;        return (B) this;    }

我们来看一下AbstractBootstrap的类概览,主要看一下属性和构造:

可以看到bossGroup是赋值给AbstractBootstrap的group属性的。

3. bootstrap.channel(NioServerSocketChannel.class)

关于这个方法在netty源码分析一之NioServerSocketChannel中已经完整地介绍过,这里不再赘述,它的目的是创建一个泛型为NioServerSocketChannel的ReflectiveChannelFactory。

4. bootstrap.childHandler(new WebSocketServerInitializer(sslCtx))

    /**     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.     */    public ServerBootstrap childHandler(ChannelHandler childHandler) {        if (childHandler == null) {            throw new NullPointerException("childHandler");        }        this.childHandler = childHandler;        return this;    }

在这里的主要作用是赋值给ServerBootstrap的属性childHandler。

5. bootstrap.bind(port)

先看一下bind方法:

  /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(int inetPort) {        return bind(new InetSocketAddress(inetPort));    }

继续往下看:

  /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(SocketAddress localAddress) {        validate();        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        return doBind(localAddress);    }

我们继续来看doBind方法:

private ChannelFuture doBind(final SocketAddress localAddress) {    //初始化并进行注册    final ChannelFuture regFuture = initAndRegister();    //获取channel    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }
    if (regFuture.isDone()) {        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // Registration future is almost always fulfilled already, but just in case it's not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}

这个方法里面我们主要需要分析三点: initAndRegister()方法、ChannelFuture和doBind0(regFuture, channel, localAddress, promise)方法。

5.1 initAndRegister()

这个方法的主要作用是初始化和注册。我们来看具体代码io.netty.bootstrap.AbstractBootstrap#initAndRegister:

final ChannelFuture initAndRegister() {    //这个channel是ServerSocketChannel    Channel channel = null;    try {        channel = channelFactory.newChannel();        System.out.println("channel:" + channel);        //1.初始化channel        init(channel);    } catch (Throwable t) {        if (channel != null) {            // channel can be null if newChannel crashed (eg SocketException("too many open files"))            channel.unsafe().closeForcibly();        }        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);    }    //2. register channel    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }
    // If we are here and the promise is not failed, it's one of the following cases:    // 1) If we attempted registration from the event loop, the registration has been completed at this point.    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.    // 2) If we attempted registration from the other thread, the registration request has been successfully    //    added to the event loop's task queue for later execution.    //    i.e. It's safe to attempt bind() or connect() now:    //         because bind() or connect() will be executed *after* the scheduled registration task is executed    //         because register(), bind(), and connect() are all bound to the same thread.
    return regFuture;}

这段代码主要分三步:

1. 使用channelFactory创建ServerSocketChannel2. init(channel) 初始化channel3. config().group().register(channel)初始化channel

创建channel的部分我们在分析一中已经介绍过了,接下来我们来一步步地看后面的步骤: 首先init(channel),实际调用的是io.netty.bootstrap.ServerBootstrap#init方法:

@Overridevoid init(Channel channel) throws Exception {    final Map<ChannelOption<?>, Object> options = options0();    synchronized (options) {        //给ServerSocketChannel设置一些options        setChannelOptions(channel, options, logger);    }
    final Map<AttributeKey<?>, Object> attrs = attrs0();    synchronized (attrs) {        //将在ServerBootstrap中携带的attr放入到channel的attr中        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {            @SuppressWarnings("unchecked")            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();            channel.attr(key).set(e.getValue());        }    }    //获取channel的pipeline,这时候只有head和tail节点,还没有加入用户自定义的handler    ChannelPipeline p = channel.pipeline();    //这里的childGroup是指外层传入的workerGroup    final EventLoopGroup currentChildGroup = childGroup;    //这个是外层传入的childHandler(new WebSocketServerInitializer(sslCtx))    final ChannelHandler currentChildHandler = childHandler;    final Entry<ChannelOption<?>, Object>[] currentChildOptions;    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;    synchronized (childOptions) {        //设置options        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));    }    synchronized (childAttrs) {        //设置attrs        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));    }    //向传入的ServerSocketChannel的pipline中添加handler,这里就像在最外层通过initializer方式添加channel handler一样,这里相当于添加了一个acceptor    p.addLast(new ChannelInitializer<Channel>() {        @Override        public void initChannel(final Channel ch) throws Exception {            System.out.println("channel:" + ch + " start init channel!");            //这里获取到的相当于是nio中的SocketChannel            final ChannelPipeline pipeline = ch.pipeline();            //在SocketChannel中添加bootstrap.handler(),我们的示例中添加的是childHandler(new WebSocketServerInitializer(sslCtx)),所以这里为null            ChannelHandler handler = config.handler();            if (handler != null) {                pipeline.addLast(handler);            }            //调用SocketChannel的eventLoop,向里面添加一个往pipeline中添加ServerBootstrapAcceptor的内部任务            //这个是在回调时调用的,所以是内部的eventLoop调用            ch.eventLoop().execute(new Runnable() {                @Override                public void run() {                //这里将ServerSocketChannel和bootstrap传入的workerGroup,childHandler等都传入                    pipeline.addLast(new ServerBootstrapAcceptor(                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}

在里面进行的是一系列的初始化操作,其中在ServerSocketChannel的pipeline中添加了一个ChannelInitializer,在ChannelInitializer内部进行的是 往每个连接对应的SocketChannel的pipeline中添加一个ServerBootstrapAcceptor。关于ServerBootstrapAcceptor我们后面再专门来讲一讲。

这里我们先来看一看config().group().register(channel),也就是ServerSocketChannel的注册部分: 我们先看一看io.netty.bootstrap.AbstractBootstrapConfig#group方法:

  public final EventLoopGroup group() {        //对bootstrap进行了包装        return bootstrap.group();    }

这里返回的是bootstrap.group也就是外面传入的bossGroup,即大名鼎鼎的reactor线程。这里的操作实际上是用reactor线程来注册ServerSocketChannel:

看过源码分析二的会知道,这里会进入到io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)方法:

    @Override    public ChannelFuture register(Channel channel) {        //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor        return register(new DefaultChannelPromise(channel, this));    }
    @Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        //register方法传入的EventLoop就是当前的这个boosEventGroup对应的reactor线程        promise.channel().unsafe().register(this, promise);        return promise;    }

注意这里的channel对应的是NioServerSocketChannel,它的unsafe方法为io.netty.channel.nio.AbstractNioChannel#unsafe:

  @Override    public NioUnsafe unsafe() {        return (NioUnsafe) super.unsafe();    }

然后调用的是io.netty.channel.AbstractChannel#unsafe:

 @Override    public Unsafe unsafe() {        return unsafe;    }

这里的unsafe初始化在源码分析一中已经分析过,这里简单提一下:

  protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }

最终对应的是io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe方法:

 @Override    protected AbstractNioUnsafe newUnsafe() {        return new NioMessageUnsafe();    }

register调用的也就是NioMessageUnsafe的register方法,它对应的是io.netty.channel.AbstractChannel.AbstractUnsafe的register方法,其中AbstractUnsafe是AbstractChannel的内部类,register方法为:

   /**     * 1. 首先通过isRegistered() 判断该 Channel 是否已经注册到 EventLoop 中;     * 2. 通过 eventLoop.inEventLoop() 来判断当前线程是否为该 EventLoop 自身发起的,如果是,则调用 register0() 直接注册;     * 3. 如果不是,说明该 EventLoop 中的线程此时没有执行权,则需要新建一个线程,单独封装一个 Task,而该 Task 的主要任务则是执行 register0()。     * @param eventLoop     * @param promise     */    @Override    public final void register(EventLoop eventLoop, final ChannelPromise promise) {        if (eventLoop == null) {            throw new NullPointerException("eventLoop");        }        //查看一下当前channel是不是已经注册过了        if (isRegistered()) {            promise.setFailure(new IllegalStateException("registered to an event loop already"));            return;        }        if (!isCompatible(eventLoop)) {            promise.setFailure(                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));            return;        }        //这里是传入的eventLoop        AbstractChannel.this.eventLoop = eventLoop;        //如果是内部线程        if (eventLoop.inEventLoop()) {            register0(promise);        } else {            try {                //如果不是内部线程                eventLoop.execute(new Runnable() {                    @Override                    public void run() {                        register0(promise);                    }                });            } catch (Throwable t) {                logger.warn(                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",                        AbstractChannel.this, t);                closeForcibly();                closeFuture.setClosed();                safeSetFailure(promise, t);            }        }    }

如果是内部线程,直接执行register0方法,如果不是则进入下面的方法,这里我们先分析下eventLoop.inEventLoop()对应的是io.netty.util.concurrent.AbstractEventExecutor#inEventLoop:

   @Override    public boolean inEventLoop() {        return inEventLoop(Thread.currentThread());    }

这里的Thread.currentThread()为bind对应的线程即main线程,io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop:

  @Override    public boolean inEventLoop(Thread thread) {        return thread == this.thread;    }

可以看到,其实这里使用的不是内部线程,也就是说会进入下面的方法。我们先看一看eventLoop.execute(new Runnable())方法对应的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

 @Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            startThread();            //将传入的任务添加到队列中            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }

这里会进入 startThread()和addTask(task)方法。我们这里多说几句,继续看startThread方法:

private void startThread() {    //检查下线程是否已经启动    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            doStartThread();        }    }}

doStartThread方法:

  /** * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法, * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行 */private void doStartThread() {    assert thread == null;    //executor 默认是ThreadPerTaskExecutor    //该线程就是executor创建,对应netty的reactor线程实体    //默认情况下,ThreadPerTaskExecutor 在每次执行execute    // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体    executor.execute(new Runnable() {        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }
            boolean success = false;            updateLastExecutionTime();            try {                //调用的是io.netty.channel.nio.NioEventLoop.run                SingleThreadEventExecutor.this.run();                success = true;            } catch (Throwable t) {                logger.warn("Unexpected exception from an event executor: ", t);            } finally {                for (;;) {                    int oldState = state;                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {                        break;                    }                }    ........

executor.execute方法调用的是:

  /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */    static final class ThreadPerTaskExecutor implements Executor {        public void execute(Runnable r) { new Thread(r).start(); }    }

这里我们主要分析一下SingleThreadEventExecutor.this.run()调用的是io.netty.channel.nio.NioEventLoop#run:

 @Override    protected void run() {        for (;;) {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }            } catch (Throwable t) {                handleLoopException(t);            }            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                handleLoopException(t);            }        }    }

关于这个方法,我们后面会专门来讲,这里主要看一下processSelectedKeys()方法:

   private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

最终调用的是:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();    int readyOps = k.readyOps();    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {        unsafe.read();    }}

我们知道,NioServerSocketChannel注册的是ACCEPT事件,当有read事件进入时,会进入unsafe.read(),调用的是NioMessageUnsafe的read():

private final List<Object> readBuf = new ArrayList<Object>();
public void read() {    assert eventLoop().inEventLoop();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    do {        int localRead = doReadMessages(readBuf);        if (localRead == 0) {            break;        }        if (localRead < 0) {            closed = true;            break;        }    } while (allocHandle.continueReading());    int size = readBuf.size();    for (int i = 0; i < size; i ++) {        pipeline.fireChannelRead(readBuf.get(i));    }    readBuf.clear();    pipeline.fireChannelReadComplete();}

接下来,调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead()。

  • 我们看下io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,向readBuf里放入的是NioSocketChannel对象,这个在我们讲客户端接入时再详细分析:
 @Override    protected int doReadMessages(List<Object> buf) throws Exception {        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {            if (ch != null) {                buf.add(new NioSocketChannel(this, ch));                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }
        return 0;    }
  • 这里尤其需要注意的一点是pipeline.fireChannelRead(readBuf.get(i))方法的参数其实是一个channel,对应nio中的nioSocketChannel,在下面的ServerBootstrapAcceptor的channelRead(Object msg)方法中传入的msg就是它。

继续来讲关于register0方法:

 private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        //默认值是true        boolean firstRegistration = neverRegistered;        //进行注册        doRegister();        //注册结束后设为false        neverRegistered = false;        //将状态改成已注册        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        // 添加用户添加的PendingHandlerAddedTask        pipeline.invokeHandlerAddedIfNeeded();        //确保promise设置成功        safeSetSuccess(promise);        //回调头节点的channelRegistered方法        pipeline.fireChannelRegistered();        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。        // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()        if (isActive()) {            ////如果是首次注册,发起 pipeline 的 fireChannelActive            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                //如果设置了autoRead,这里就开始读取数据了。注意下ServerBootstrapAcceptor中有设置channel为autoRead true的操作                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

也就是说如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive(),而如果是注册过的且处于激活状态则会调用beginRead方法,注册ACCEPT事件。

关于auto read,我们需要关注下ServerBootstrapAcceptor部分的操作:

接下来我们看一看doRegister方法,io.netty.channel.nio.AbstractNioChannel#doRegister方法如下:

 @Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            //这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作,这样做的目的是(摘自《Netty权威指南(第二版)》):            //            //注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。            //通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量            // selectionKey 赋值。            //由于这里 ops 设置为 0,所以还不能监听读写事件。调用 doRegister()后,然后调用pipeline.invokeHandlerAddedIfNeeded();,            // 这个时候控制台会出现 loggin-handlerAdded,内部如何调用,            // 我们在剖析 pipeline 时再做详细分析。然后将注册结果设置为成功(safeSetSuccess(promise))。调用 pipeline.fireChannelRegistered();            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

这里NioServerSocketChannel向selector上注册的ops是0;

关于beginRead()方法,最终调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead,在这个方法内部调用的是io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead方法, 继续往下找,调用的是io.netty.channel.nio.AbstractNioChannel#doBeginRead:

@Overrideprotected void doBeginRead() throws Exception {    // Channel.read() or ChannelHandlerContext.read() was called    //这里是nio的selectionKey    final SelectionKey selectionKey = this.selectionKey;    //selectionKey.isValid()默认是true,当调用了cancle方法后是false    if (!selectionKey.isValid()) {        return;    }
    readPending = true;    //感兴趣的操作码    final int interestOps = selectionKey.interestOps();    if ((interestOps & readInterestOp) == 0) {        selectionKey.interestOps(interestOps | readInterestOp);    }}

在上面的doRegister中NioServerSocketChannel向selector注册的ops为0,而NioServerSocketChannel的interestOps为SelectionKey.OPACCEPT(具体可以看看关于NioServerSocketChannel的那篇源码分析), 那么在这里的计算就是interestOps & readInterestOp = 0 & SelectionKey.OPACCEPT结果为0,而interestOps | readInterestOp= 0 | OPACCEPT 结果为SelectionKey.OPACCEPT。

5.2 ChannelFuture

是jdk Future的子类,主要提供了添加一些监听器,进行同步、等待等操作的方法:

5.3 doBind0(regFuture, channel, localAddress, promise);

我们来看下io.netty.bootstrap.AbstractBootstrap#doBind0方法:

private static void doBind0(        final ChannelFuture regFuture, final Channel channel,        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up    // the pipeline in its channelRegistered() implementation.    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            //在上面的注册工作完成之后,才进行bind操作            if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}

我们看下channel的bind方法,io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

 @Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {    return pipeline.bind(localAddress, promise);}

io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

 @Override    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return tail.bind(localAddress, promise);    }

继续往下看,调到io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

 @Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {    if (localAddress == null) {        throw new NullPointerException("localAddress");    }    if (isNotValidPromise(promise, false)) {        // cancelled        return promise;    }
    final AbstractChannelHandlerContext next = findContextOutbound();    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeBind(localAddress, promise);    } else {        safeExecute(executor, new Runnable() {            @Override            public void run() {                next.invokeBind(localAddress, promise);            }        }, promise, null);    }    return promise;}

io.netty.channel.AbstractChannelHandlerContext#invokeBind:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {    if (invokeHandler()) {        try {            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);        } catch (Throwable t) {            notifyOutboundHandlerException(t, promise);        }    } else {        bind(localAddress, promise);    }}

然后进入io.netty.channel.DefaultChannelPipeline.HeadContext#bind:

@Overridepublic void bind(        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)        throws Exception {    unsafe.bind(localAddress, promise);}

然后进入的是NioMessageUnsafe,它的bind方法是io.netty.channel.AbstractChannel.AbstractUnsafe#bind:

  @Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop();
 if (!promise.setUncancellable() || !ensureOpen(promise)) {     return; }
 // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&     localAddress instanceof InetSocketAddress &&     !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&     !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {     // Warn a user about the fact that a non-root user can't receive a     // broadcast packet on *nix if the socket is bound on non-wildcard address.     logger.warn(             "A non-root user can't receive a broadcast packet if the socket " +             "is not bound to a wildcard address; binding to a non-wildcard " +             "address (" + localAddress + ") anyway as requested."); } //绑定之后就会激活,这时候wasActive返回的应该是false,执行doBind方法之后isActive()返回的应该是true boolean wasActive = isActive(); try {     doBind(localAddress); } catch (Throwable t) {     safeSetFailure(promise, t);     closeIfClosed();     return; } //在绑定之后执行pipeline.fireChannelActive方法 if (!wasActive && isActive()) {     invokeLater(new Runnable() {         @Override         public void run() {             pipeline.fireChannelActive();         }     }); }
 safeSetSuccess(promise);}

这里的isActive方法对应的是io.netty.channel.socket.nio.NioServerSocketChannel#isActive:

  @Override    public boolean isActive() {        return javaChannel().socket().isBound();    }

调用的是jdk底层的ServerSocket的isBound方法,可见在执行doBind方法之前wasActive的值为false,执行了doBind方法之后,这个值为true,然后会执行pipeline.fireChannelActive()。

这里的doBind方法调用的是io.netty.channel.socket.nio.NioServerSocketChannel#doBind:

@Override    protected void doBind(SocketAddress localAddress) throws Exception {        if (PlatformDependent.javaVersion() >= 7) {            javaChannel().bind(localAddress, config.getBacklog());        } else {            javaChannel().socket().bind(localAddress, config.getBacklog());        }    }

可以看出,最终还是调jdk底层的ServerSocketChannel来进行bind操作的。

ServerBootstrapAcceptor

关于ServerBootstrapAcceptor,我们关注一下它的channelRead方法:

@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {    //关于这里为什么传入的是Channel,上面已经提到过    final Channel child = (Channel) msg;    //这里把ServerBootstrap传入的childHandler添加到pipeline中    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());    }
    try {        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

这里主要分析三点:

  • 关于这里为什么传入的是msg能强转Channel,上面已经提到过。
  • child.pipeline().addLast(childHandler) 这里是将外面自定义的childHandler添加到每个连接对应的SocketChannel中去,在内部会判断该handler是不是Sharable的(如果是Shareable的handler可以使用单例哦)
  • childGroup.register(child)注册,我们来分析一下:
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):
  @Override    public ChannelFuture register(Channel channel) {        return next().register(channel);    }

这里的next()对应的是什么呢,我们来一层层看看:

  @Override    public EventLoop next() {        return (EventLoop) super.next();    }

然后进去:

  @Override    public EventExecutor next() {        return chooser.next();    }

这个chooser就是我们在源码分析二中重点讲过的EventExecutorChooserFactory.EventExecutorChooser,它是在new NioEventLoopGroup(nThreads)的时候初始化的, 初始化的数组Executor[nThreads],它的next()方法是从这个Exector数组中顺序随机取一个Exector,也就是取一个EventLoop。

然后我们分析一下register方法,调用的是NioEventLoop父类的io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):

@Overridepublic ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) {  ObjectUtil.checkNotNull(promise, "promise");  promise.channel().unsafe().register(this, promise);  return promise;}  

可以看到最后调用的是channel的unsafe来进行注册的操作的。

总结

服务端的启动流程到这里就分析结束了,这里我们总结一下,并对其中比较关键的几个点进行一个总结。初始化过程主要分为以下几步: 初始化NioEventLoopGroup,里面包括NioEventLoop的初始化操作。 创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等。初始化server对应的channel, 设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件。 初始化NioServerSocketChannel,并初始化unsafe(NioMessageUnsafe)和pipeline,并在pipeline中添加ServerBootstrapAcceptor等。 注册时通过selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)将NioServerSocketChannel添加到SelectionKey的attachment中。如果是第一次注册且处于激活状态则会触发pipeline的fireChannelActive()方法,否则如果设置了autoRead则会触发beginRead方法并向selector上注册SelectionKey.OP_ACCEPT。 NioEventLoop的execute方法在外部线程调用时会先启动startThread来启动一个线程,在线程中调用run()方法,而run方法是一个死循环,不停地轮询selectKey的信息,这里只是简单提一下,之后会详细分析。需要注 意的是初始化是main线程执行的,也就是说在初始化的时候就会调用startThread方法打开reactor线程,然后开始select selectKey的事件。 startThread开启的线程是通过ThreadPerTaskExecutor.execute来完成的,ThreadPerTaskExecutor 在每次执行execute方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体。 NioEventLoop的run方法中会循环轮询processSelectedKey方法,在processSelectedKey方法中监听到ACCEPT事件后会调用 unsafe.read()方法,在 unsafe.read()方法中会调用NioMessageUnsafe的read方法。 在NioMessageUnsafe的read方法中会为每个客户端创建NioSocketChannel并加入到List readBuf中,然后触发pipeline的fireChannelRead方法,最终将NioSocketChannel对象传入到 void channelRead(ChannelHandlerContext ctx, Object msg)的msg参数, 这也是为什么在ServerBootstrapAcceptor的channelRead方法第二个参数是NioSocketChannel对象。 ServerBootstrapAcceptor是reactor线程的acceptor,IO操作分发给具体channel的都是通过acceptor线程来进行的。

本文分享自微信公众号 - 开发架构二三事(gh_d6f166e26398)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我是攻城师

关于zookeeper写入数据超过1M大小的踩坑记

某天晚上集群的一个任务提交一直失败,经过排查日志,发现是zk客户端写入的数据包过大,导致报错。我们来看下,这中间发生了什么。

40530
来自专栏京程一灯

更轻量级的 V8 引擎[每日前端夜话0xC8]

作者:Mythri Alle, Dan Elphick, and Ross McIlroy

9620
来自专栏路人甲Java

玩转Mysql系列 - 第13篇:详解事务

数据库中的事务是指对数据库执行一批操作,这些操作最终要么全部执行成功,要么全部失败,不会存在部分成功的情况。

6820
来自专栏京程一灯

JavaScript中的异步生成器函数[每日前端夜话0xC9]

TC39异步迭代器提案 将 for/await/of 引入了 JavaScript【http://thecodebarbarian.com/getting-st...

5620
来自专栏Java3y

Java基础知识点面试手册(线程+JDK8)

在单核 CPU 中,将 CPU 分为很小的时间片,在每一时刻只能有一个线程在执行,是一种微观上轮流占用 CPU 的机制。

8320
来自专栏架构圈的那些事

Golang横空出世的背景(为什么选择Go)

最近十年来,C/C++在计算领域没有很好得到发展,并没有新的系统编程语言出现。对开发程度和系统效率在很多情况下不能兼得。要么执行效率高,但低效的开发和编译,如C...

9240
来自专栏Java那些事

2019秋招:460道Java后端面试高频题答案版【模块七:设计模式】

1. 设计模式虽然在面试中没有计算机网络协议和操作系统那么重要,但是也是会被经常问到的。设计模式除了在 JDK 和一些框架中被大量使用到外,在日常开发中也是很常...

14760
来自专栏JAVA葵花宝典

看完你就应该能明白的悲观锁和乐观锁

Java 按照锁的实现分为乐观锁和悲观锁,乐观锁和悲观锁并不是一种真实存在的锁,而是一种设计思想,乐观锁和悲观锁对于理解 Java 多线程和数据库来说至关重要,...

7820
来自专栏好好学java的技术栈

告诉你一种精简、优化代码的方式

相对于Java8之前的Java的相关操作简直是天差地别,Java8 的流式操作的出现,也很大程度上改变了开发者对于Java的繁琐的操作的印象,从此,Java也走...

10020
来自专栏JAVA葵花宝典

SpringBoot优雅地发送邮件

消息通知的形式也有很多,比如:短信、邮件、app推送等,本文主要给大家描述一下邮件通知的形式,因为邮件相比较其他通知渠道更方便实用(免费),除了简单文本邮件(已...

8120

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励