前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty源码分析之三Bootstrap初始化

netty源码分析之三Bootstrap初始化

作者头像
山行AI
发布2019-10-24 14:49:42
1.1K0
发布2019-10-24 14:49:42
举报
文章被收录于专栏:山行AI山行AI

Bootstrap是netty的启动配置类,里面涉及到大量的netty的初始化操作,本篇来分析下它的启动流程

netty启动代码

代码语言:javascript
复制
public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .childHandler(new WebSocketServerInitializer(sslCtx));
            Channel ch = bootstrap.bind(PORT).sync().channel();
            System.out.println("Open your web browser and navigate to " +                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
            ch.closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
  • ServerBootstrap主要的启动配置类
  • ServerBootstrap.bind方法主要处理一些端口绑定,channel注册等操作,是核心方法

下面我们一一来看看。

ServerBootStrap初始化

1. 构造方法:

代码语言:javascript
复制
public ServerBootstrap() { }

属性主要有:

代码语言:javascript
复制
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);    private volatile EventLoopGroup childGroup;    private volatile ChannelHandler childHandler;

2. bootstrap.group(bossGroup, workerGroup)

代码语言:javascript
复制
  /**     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and     * {@link Channel}'s.     */    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {        super.group(parentGroup);        if (childGroup == null) {            throw new NullPointerException("childGroup");        }        if (this.childGroup != null) {            throw new IllegalStateException("childGroup set already");        }        this.childGroup = childGroup;        return this;    }

可以看到childGroup也就是group方法传入的workerGroup是赋值给ServerBootstrap的childGroup属性的。我们进入 super.group(parentGroup)也就是io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup):

代码语言:javascript
复制
 /**     * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created     * {@link Channel}     */    @SuppressWarnings("unchecked")    public B group(EventLoopGroup group) {        if (group == null) {            throw new NullPointerException("group");        }        if (this.group != null) {            throw new IllegalStateException("group set already");        }        this.group = group;        return (B) this;    }

我们来看一下AbstractBootstrap的类概览,主要看一下属性和构造:

可以看到bossGroup是赋值给AbstractBootstrap的group属性的。

3. bootstrap.channel(NioServerSocketChannel.class)

关于这个方法在netty源码分析一之NioServerSocketChannel中已经完整地介绍过,这里不再赘述,它的目的是创建一个泛型为NioServerSocketChannel的ReflectiveChannelFactory。

4. bootstrap.childHandler(new WebSocketServerInitializer(sslCtx))

代码语言:javascript
复制
    /**     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.     */    public ServerBootstrap childHandler(ChannelHandler childHandler) {        if (childHandler == null) {            throw new NullPointerException("childHandler");        }        this.childHandler = childHandler;        return this;    }

在这里的主要作用是赋值给ServerBootstrap的属性childHandler。

5. bootstrap.bind(port)

先看一下bind方法:

代码语言:javascript
复制
  /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(int inetPort) {        return bind(new InetSocketAddress(inetPort));    }

继续往下看:

代码语言:javascript
复制
  /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(SocketAddress localAddress) {        validate();        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        return doBind(localAddress);    }

我们继续来看doBind方法:

代码语言:javascript
复制
private ChannelFuture doBind(final SocketAddress localAddress) {    //初始化并进行注册    final ChannelFuture regFuture = initAndRegister();    //获取channel    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }
    if (regFuture.isDone()) {        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // Registration future is almost always fulfilled already, but just in case it's not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}

这个方法里面我们主要需要分析三点: initAndRegister()方法、ChannelFuture和doBind0(regFuture, channel, localAddress, promise)方法。

5.1 initAndRegister()

这个方法的主要作用是初始化和注册。我们来看具体代码io.netty.bootstrap.AbstractBootstrap#initAndRegister:

代码语言:javascript
复制
final ChannelFuture initAndRegister() {    //这个channel是ServerSocketChannel    Channel channel = null;    try {        channel = channelFactory.newChannel();        System.out.println("channel:" + channel);        //1.初始化channel        init(channel);    } catch (Throwable t) {        if (channel != null) {            // channel can be null if newChannel crashed (eg SocketException("too many open files"))            channel.unsafe().closeForcibly();        }        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);    }    //2. register channel    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }
    // If we are here and the promise is not failed, it's one of the following cases:    // 1) If we attempted registration from the event loop, the registration has been completed at this point.    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.    // 2) If we attempted registration from the other thread, the registration request has been successfully    //    added to the event loop's task queue for later execution.    //    i.e. It's safe to attempt bind() or connect() now:    //         because bind() or connect() will be executed *after* the scheduled registration task is executed    //         because register(), bind(), and connect() are all bound to the same thread.
    return regFuture;}

这段代码主要分三步:

代码语言:javascript
复制
1. 使用channelFactory创建ServerSocketChannel2. init(channel) 初始化channel3. config().group().register(channel)初始化channel

创建channel的部分我们在分析一中已经介绍过了,接下来我们来一步步地看后面的步骤: 首先init(channel),实际调用的是io.netty.bootstrap.ServerBootstrap#init方法:

代码语言:javascript
复制
@Overridevoid init(Channel channel) throws Exception {    final Map<ChannelOption<?>, Object> options = options0();    synchronized (options) {        //给ServerSocketChannel设置一些options        setChannelOptions(channel, options, logger);    }
    final Map<AttributeKey<?>, Object> attrs = attrs0();    synchronized (attrs) {        //将在ServerBootstrap中携带的attr放入到channel的attr中        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {            @SuppressWarnings("unchecked")            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();            channel.attr(key).set(e.getValue());        }    }    //获取channel的pipeline,这时候只有head和tail节点,还没有加入用户自定义的handler    ChannelPipeline p = channel.pipeline();    //这里的childGroup是指外层传入的workerGroup    final EventLoopGroup currentChildGroup = childGroup;    //这个是外层传入的childHandler(new WebSocketServerInitializer(sslCtx))    final ChannelHandler currentChildHandler = childHandler;    final Entry<ChannelOption<?>, Object>[] currentChildOptions;    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;    synchronized (childOptions) {        //设置options        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));    }    synchronized (childAttrs) {        //设置attrs        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));    }    //向传入的ServerSocketChannel的pipline中添加handler,这里就像在最外层通过initializer方式添加channel handler一样,这里相当于添加了一个acceptor    p.addLast(new ChannelInitializer<Channel>() {        @Override        public void initChannel(final Channel ch) throws Exception {            System.out.println("channel:" + ch + " start init channel!");            //这里获取到的相当于是nio中的SocketChannel            final ChannelPipeline pipeline = ch.pipeline();            //在SocketChannel中添加bootstrap.handler(),我们的示例中添加的是childHandler(new WebSocketServerInitializer(sslCtx)),所以这里为null            ChannelHandler handler = config.handler();            if (handler != null) {                pipeline.addLast(handler);            }            //调用SocketChannel的eventLoop,向里面添加一个往pipeline中添加ServerBootstrapAcceptor的内部任务            //这个是在回调时调用的,所以是内部的eventLoop调用            ch.eventLoop().execute(new Runnable() {                @Override                public void run() {                //这里将ServerSocketChannel和bootstrap传入的workerGroup,childHandler等都传入                    pipeline.addLast(new ServerBootstrapAcceptor(                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}

在里面进行的是一系列的初始化操作,其中在ServerSocketChannel的pipeline中添加了一个ChannelInitializer,在ChannelInitializer内部进行的是 往每个连接对应的SocketChannel的pipeline中添加一个ServerBootstrapAcceptor。关于ServerBootstrapAcceptor我们后面再专门来讲一讲。

这里我们先来看一看config().group().register(channel),也就是ServerSocketChannel的注册部分: 我们先看一看io.netty.bootstrap.AbstractBootstrapConfig#group方法:

代码语言:javascript
复制
  public final EventLoopGroup group() {        //对bootstrap进行了包装        return bootstrap.group();    }

这里返回的是bootstrap.group也就是外面传入的bossGroup,即大名鼎鼎的reactor线程。这里的操作实际上是用reactor线程来注册ServerSocketChannel:

看过源码分析二的会知道,这里会进入到io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)方法:

代码语言:javascript
复制
    @Override    public ChannelFuture register(Channel channel) {        //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor        return register(new DefaultChannelPromise(channel, this));    }
    @Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        //register方法传入的EventLoop就是当前的这个boosEventGroup对应的reactor线程        promise.channel().unsafe().register(this, promise);        return promise;    }

注意这里的channel对应的是NioServerSocketChannel,它的unsafe方法为io.netty.channel.nio.AbstractNioChannel#unsafe:

代码语言:javascript
复制
  @Override    public NioUnsafe unsafe() {        return (NioUnsafe) super.unsafe();    }

然后调用的是io.netty.channel.AbstractChannel#unsafe:

代码语言:javascript
复制
 @Override    public Unsafe unsafe() {        return unsafe;    }

这里的unsafe初始化在源码分析一中已经分析过,这里简单提一下:

代码语言:javascript
复制
  protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }

最终对应的是io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe方法:

代码语言:javascript
复制
 @Override    protected AbstractNioUnsafe newUnsafe() {        return new NioMessageUnsafe();    }

register调用的也就是NioMessageUnsafe的register方法,它对应的是io.netty.channel.AbstractChannel.AbstractUnsafe的register方法,其中AbstractUnsafe是AbstractChannel的内部类,register方法为:

代码语言:javascript
复制
   /**     * 1. 首先通过isRegistered() 判断该 Channel 是否已经注册到 EventLoop 中;     * 2. 通过 eventLoop.inEventLoop() 来判断当前线程是否为该 EventLoop 自身发起的,如果是,则调用 register0() 直接注册;     * 3. 如果不是,说明该 EventLoop 中的线程此时没有执行权,则需要新建一个线程,单独封装一个 Task,而该 Task 的主要任务则是执行 register0()。     * @param eventLoop     * @param promise     */    @Override    public final void register(EventLoop eventLoop, final ChannelPromise promise) {        if (eventLoop == null) {            throw new NullPointerException("eventLoop");        }        //查看一下当前channel是不是已经注册过了        if (isRegistered()) {            promise.setFailure(new IllegalStateException("registered to an event loop already"));            return;        }        if (!isCompatible(eventLoop)) {            promise.setFailure(                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));            return;        }        //这里是传入的eventLoop        AbstractChannel.this.eventLoop = eventLoop;        //如果是内部线程        if (eventLoop.inEventLoop()) {            register0(promise);        } else {            try {                //如果不是内部线程                eventLoop.execute(new Runnable() {                    @Override                    public void run() {                        register0(promise);                    }                });            } catch (Throwable t) {                logger.warn(                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",                        AbstractChannel.this, t);                closeForcibly();                closeFuture.setClosed();                safeSetFailure(promise, t);            }        }    }

如果是内部线程,直接执行register0方法,如果不是则进入下面的方法,这里我们先分析下eventLoop.inEventLoop()对应的是io.netty.util.concurrent.AbstractEventExecutor#inEventLoop:

代码语言:javascript
复制
   @Override    public boolean inEventLoop() {        return inEventLoop(Thread.currentThread());    }

这里的Thread.currentThread()为bind对应的线程即main线程,io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop:

代码语言:javascript
复制
  @Override    public boolean inEventLoop(Thread thread) {        return thread == this.thread;    }

可以看到,其实这里使用的不是内部线程,也就是说会进入下面的方法。我们先看一看eventLoop.execute(new Runnable())方法对应的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

代码语言:javascript
复制
 @Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            startThread();            //将传入的任务添加到队列中            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }

这里会进入 startThread()和addTask(task)方法。我们这里多说几句,继续看startThread方法:

代码语言:javascript
复制
private void startThread() {    //检查下线程是否已经启动    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            doStartThread();        }    }}

doStartThread方法:

代码语言:javascript
复制
  /** * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法, * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行 */private void doStartThread() {    assert thread == null;    //executor 默认是ThreadPerTaskExecutor    //该线程就是executor创建,对应netty的reactor线程实体    //默认情况下,ThreadPerTaskExecutor 在每次执行execute    // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体    executor.execute(new Runnable() {        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }
            boolean success = false;            updateLastExecutionTime();            try {                //调用的是io.netty.channel.nio.NioEventLoop.run                SingleThreadEventExecutor.this.run();                success = true;            } catch (Throwable t) {                logger.warn("Unexpected exception from an event executor: ", t);            } finally {                for (;;) {                    int oldState = state;                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {                        break;                    }                }    ........

executor.execute方法调用的是:

代码语言:javascript
复制
  /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */    static final class ThreadPerTaskExecutor implements Executor {        public void execute(Runnable r) { new Thread(r).start(); }    }

这里我们主要分析一下SingleThreadEventExecutor.this.run()调用的是io.netty.channel.nio.NioEventLoop#run:

代码语言:javascript
复制
 @Override    protected void run() {        for (;;) {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }            } catch (Throwable t) {                handleLoopException(t);            }            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                handleLoopException(t);            }        }    }

关于这个方法,我们后面会专门来讲,这里主要看一下processSelectedKeys()方法:

代码语言:javascript
复制
   private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

最终调用的是:

代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();    int readyOps = k.readyOps();    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {        unsafe.read();    }}

我们知道,NioServerSocketChannel注册的是ACCEPT事件,当有read事件进入时,会进入unsafe.read(),调用的是NioMessageUnsafe的read():

代码语言:javascript
复制
private final List<Object> readBuf = new ArrayList<Object>();
public void read() {    assert eventLoop().inEventLoop();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    do {        int localRead = doReadMessages(readBuf);        if (localRead == 0) {            break;        }        if (localRead < 0) {            closed = true;            break;        }    } while (allocHandle.continueReading());    int size = readBuf.size();    for (int i = 0; i < size; i ++) {        pipeline.fireChannelRead(readBuf.get(i));    }    readBuf.clear();    pipeline.fireChannelReadComplete();}

接下来,调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead()。

  • 我们看下io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,向readBuf里放入的是NioSocketChannel对象,这个在我们讲客户端接入时再详细分析:
代码语言:javascript
复制
 @Override    protected int doReadMessages(List<Object> buf) throws Exception {        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {            if (ch != null) {                buf.add(new NioSocketChannel(this, ch));                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }
        return 0;    }
  • 这里尤其需要注意的一点是pipeline.fireChannelRead(readBuf.get(i))方法的参数其实是一个channel,对应nio中的nioSocketChannel,在下面的ServerBootstrapAcceptor的channelRead(Object msg)方法中传入的msg就是它。

继续来讲关于register0方法:

代码语言:javascript
复制
 private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        //默认值是true        boolean firstRegistration = neverRegistered;        //进行注册        doRegister();        //注册结束后设为false        neverRegistered = false;        //将状态改成已注册        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        // 添加用户添加的PendingHandlerAddedTask        pipeline.invokeHandlerAddedIfNeeded();        //确保promise设置成功        safeSetSuccess(promise);        //回调头节点的channelRegistered方法        pipeline.fireChannelRegistered();        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。        // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()        if (isActive()) {            ////如果是首次注册,发起 pipeline 的 fireChannelActive            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                //如果设置了autoRead,这里就开始读取数据了。注意下ServerBootstrapAcceptor中有设置channel为autoRead true的操作                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

也就是说如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive(),而如果是注册过的且处于激活状态则会调用beginRead方法,注册ACCEPT事件。

关于auto read,我们需要关注下ServerBootstrapAcceptor部分的操作:

接下来我们看一看doRegister方法,io.netty.channel.nio.AbstractNioChannel#doRegister方法如下:

代码语言:javascript
复制
 @Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            //这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作,这样做的目的是(摘自《Netty权威指南(第二版)》):            //            //注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。            //通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量            // selectionKey 赋值。            //由于这里 ops 设置为 0,所以还不能监听读写事件。调用 doRegister()后,然后调用pipeline.invokeHandlerAddedIfNeeded();,            // 这个时候控制台会出现 loggin-handlerAdded,内部如何调用,            // 我们在剖析 pipeline 时再做详细分析。然后将注册结果设置为成功(safeSetSuccess(promise))。调用 pipeline.fireChannelRegistered();            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

这里NioServerSocketChannel向selector上注册的ops是0;

关于beginRead()方法,最终调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead,在这个方法内部调用的是io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead方法, 继续往下找,调用的是io.netty.channel.nio.AbstractNioChannel#doBeginRead:

代码语言:javascript
复制
@Overrideprotected void doBeginRead() throws Exception {    // Channel.read() or ChannelHandlerContext.read() was called    //这里是nio的selectionKey    final SelectionKey selectionKey = this.selectionKey;    //selectionKey.isValid()默认是true,当调用了cancle方法后是false    if (!selectionKey.isValid()) {        return;    }
    readPending = true;    //感兴趣的操作码    final int interestOps = selectionKey.interestOps();    if ((interestOps & readInterestOp) == 0) {        selectionKey.interestOps(interestOps | readInterestOp);    }}

在上面的doRegister中NioServerSocketChannel向selector注册的ops为0,而NioServerSocketChannel的interestOps为SelectionKey.OPACCEPT(具体可以看看关于NioServerSocketChannel的那篇源码分析), 那么在这里的计算就是interestOps & readInterestOp = 0 & SelectionKey.OPACCEPT结果为0,而interestOps | readInterestOp= 0 | OPACCEPT 结果为SelectionKey.OPACCEPT。

5.2 ChannelFuture

是jdk Future的子类,主要提供了添加一些监听器,进行同步、等待等操作的方法:

5.3 doBind0(regFuture, channel, localAddress, promise);

我们来看下io.netty.bootstrap.AbstractBootstrap#doBind0方法:

代码语言:javascript
复制
private static void doBind0(        final ChannelFuture regFuture, final Channel channel,        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up    // the pipeline in its channelRegistered() implementation.    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            //在上面的注册工作完成之后,才进行bind操作            if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}

我们看下channel的bind方法,io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

代码语言:javascript
复制
 @Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {    return pipeline.bind(localAddress, promise);}

io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

代码语言:javascript
复制
 @Override    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return tail.bind(localAddress, promise);    }

继续往下看,调到io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise):

代码语言:javascript
复制
 @Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {    if (localAddress == null) {        throw new NullPointerException("localAddress");    }    if (isNotValidPromise(promise, false)) {        // cancelled        return promise;    }
    final AbstractChannelHandlerContext next = findContextOutbound();    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeBind(localAddress, promise);    } else {        safeExecute(executor, new Runnable() {            @Override            public void run() {                next.invokeBind(localAddress, promise);            }        }, promise, null);    }    return promise;}

io.netty.channel.AbstractChannelHandlerContext#invokeBind:

代码语言:javascript
复制
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {    if (invokeHandler()) {        try {            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);        } catch (Throwable t) {            notifyOutboundHandlerException(t, promise);        }    } else {        bind(localAddress, promise);    }}

然后进入io.netty.channel.DefaultChannelPipeline.HeadContext#bind:

代码语言:javascript
复制
@Overridepublic void bind(        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)        throws Exception {    unsafe.bind(localAddress, promise);}

然后进入的是NioMessageUnsafe,它的bind方法是io.netty.channel.AbstractChannel.AbstractUnsafe#bind:

代码语言:javascript
复制
  @Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop();
 if (!promise.setUncancellable() || !ensureOpen(promise)) {     return; }
 // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&     localAddress instanceof InetSocketAddress &&     !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&     !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {     // Warn a user about the fact that a non-root user can't receive a     // broadcast packet on *nix if the socket is bound on non-wildcard address.     logger.warn(             "A non-root user can't receive a broadcast packet if the socket " +             "is not bound to a wildcard address; binding to a non-wildcard " +             "address (" + localAddress + ") anyway as requested."); } //绑定之后就会激活,这时候wasActive返回的应该是false,执行doBind方法之后isActive()返回的应该是true boolean wasActive = isActive(); try {     doBind(localAddress); } catch (Throwable t) {     safeSetFailure(promise, t);     closeIfClosed();     return; } //在绑定之后执行pipeline.fireChannelActive方法 if (!wasActive && isActive()) {     invokeLater(new Runnable() {         @Override         public void run() {             pipeline.fireChannelActive();         }     }); }
 safeSetSuccess(promise);}

这里的isActive方法对应的是io.netty.channel.socket.nio.NioServerSocketChannel#isActive:

代码语言:javascript
复制
  @Override    public boolean isActive() {        return javaChannel().socket().isBound();    }

调用的是jdk底层的ServerSocket的isBound方法,可见在执行doBind方法之前wasActive的值为false,执行了doBind方法之后,这个值为true,然后会执行pipeline.fireChannelActive()。

这里的doBind方法调用的是io.netty.channel.socket.nio.NioServerSocketChannel#doBind:

代码语言:javascript
复制
@Override    protected void doBind(SocketAddress localAddress) throws Exception {        if (PlatformDependent.javaVersion() >= 7) {            javaChannel().bind(localAddress, config.getBacklog());        } else {            javaChannel().socket().bind(localAddress, config.getBacklog());        }    }

可以看出,最终还是调jdk底层的ServerSocketChannel来进行bind操作的。

ServerBootstrapAcceptor

关于ServerBootstrapAcceptor,我们关注一下它的channelRead方法:

代码语言:javascript
复制
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {    //关于这里为什么传入的是Channel,上面已经提到过    final Channel child = (Channel) msg;    //这里把ServerBootstrap传入的childHandler添加到pipeline中    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());    }
    try {        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

这里主要分析三点:

  • 关于这里为什么传入的是msg能强转Channel,上面已经提到过。
  • child.pipeline().addLast(childHandler) 这里是将外面自定义的childHandler添加到每个连接对应的SocketChannel中去,在内部会判断该handler是不是Sharable的(如果是Shareable的handler可以使用单例哦)
  • childGroup.register(child)注册,我们来分析一下:
代码语言:javascript
复制
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):
  @Override    public ChannelFuture register(Channel channel) {        return next().register(channel);    }

这里的next()对应的是什么呢,我们来一层层看看:

代码语言:javascript
复制
  @Override    public EventLoop next() {        return (EventLoop) super.next();    }

然后进去:

代码语言:javascript
复制
  @Override    public EventExecutor next() {        return chooser.next();    }

这个chooser就是我们在源码分析二中重点讲过的EventExecutorChooserFactory.EventExecutorChooser,它是在new NioEventLoopGroup(nThreads)的时候初始化的, 初始化的数组Executor[nThreads],它的next()方法是从这个Exector数组中顺序随机取一个Exector,也就是取一个EventLoop。

然后我们分析一下register方法,调用的是NioEventLoop父类的io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):

代码语言:javascript
复制
@Overridepublic ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) {  ObjectUtil.checkNotNull(promise, "promise");  promise.channel().unsafe().register(this, promise);  return promise;}  

可以看到最后调用的是channel的unsafe来进行注册的操作的。

总结

服务端的启动流程到这里就分析结束了,这里我们总结一下,并对其中比较关键的几个点进行一个总结。初始化过程主要分为以下几步: 初始化NioEventLoopGroup,里面包括NioEventLoop的初始化操作。 创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等。初始化server对应的channel, 设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件。 初始化NioServerSocketChannel,并初始化unsafe(NioMessageUnsafe)和pipeline,并在pipeline中添加ServerBootstrapAcceptor等。 注册时通过selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)将NioServerSocketChannel添加到SelectionKey的attachment中。如果是第一次注册且处于激活状态则会触发pipeline的fireChannelActive()方法,否则如果设置了autoRead则会触发beginRead方法并向selector上注册SelectionKey.OP_ACCEPT。 NioEventLoop的execute方法在外部线程调用时会先启动startThread来启动一个线程,在线程中调用run()方法,而run方法是一个死循环,不停地轮询selectKey的信息,这里只是简单提一下,之后会详细分析。需要注 意的是初始化是main线程执行的,也就是说在初始化的时候就会调用startThread方法打开reactor线程,然后开始select selectKey的事件。 startThread开启的线程是通过ThreadPerTaskExecutor.execute来完成的,ThreadPerTaskExecutor 在每次执行execute方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体。 NioEventLoop的run方法中会循环轮询processSelectedKey方法,在processSelectedKey方法中监听到ACCEPT事件后会调用 unsafe.read()方法,在 unsafe.read()方法中会调用NioMessageUnsafe的read方法。 在NioMessageUnsafe的read方法中会为每个客户端创建NioSocketChannel并加入到List readBuf中,然后触发pipeline的fireChannelRead方法,最终将NioSocketChannel对象传入到 void channelRead(ChannelHandlerContext ctx, Object msg)的msg参数, 这也是为什么在ServerBootstrapAcceptor的channelRead方法第二个参数是NioSocketChannel对象。 ServerBootstrapAcceptor是reactor线程的acceptor,IO操作分发给具体channel的都是通过acceptor线程来进行的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • netty启动代码
    • ServerBootStrap初始化
      • 1. 构造方法:
      • 2. bootstrap.group(bossGroup, workerGroup)
      • 3. bootstrap.channel(NioServerSocketChannel.class)
      • 4. bootstrap.childHandler(new WebSocketServerInitializer(sslCtx))
      • 5. bootstrap.bind(port)
    • ServerBootstrapAcceptor
    • 总结
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档