前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 启动流程解析

Netty 启动流程解析

作者头像
luoxn28
发布2019-11-14 17:32:07
6950
发布2019-11-14 17:32:07
举报
文章被收录于专栏:TopCoderTopCoder

Netty的启动流程(ServerBootstrap),就是创建NioEventLoopGroup(内部可能包含多个NioEventLoop,每个eventLoop是一个线程,内部包含一个FIFO的taskQueue和Selector)和ServerBootstrap实例,并进行bind的过程(bind流程涉及到channel的创建和注册),之后就可以对外提供服务了。

Netty的启动流程中,涉及到多个操作,比如register、bind、注册对应事件等,为了不影响main线程执行,这些工作以task的形式提交给NioEventLoop,由NioEventLoop来执行这些task,也就是register、bind、注册事件等操作。

NioEventLoop(准确来说是SingleThreadEventExecutor)中包含了private volatile Thread thread,该thread变量的初始化是在new的线程第一次执行run方式时才赋值的,这种形式挺新颖的。

Netty启动流程图如下所示:

大致了解了Netty启动流程之后,下面就按照Netty启动流程中涉及到的源码来进行分析。

netty启动流程分为server端和client端,不同之处就是前者监听端口,对外提供服务(socket->bind->listen操作),对应类ServerBootstrap;后者主动去连接远端端口(socket->connect),对应类Bootstrap。

server端启动流程

server端启动流程可以理解成创建ServerBootstrap实例的过程,就以下面代码为例进行分析(echo服务):

代码语言:javascript
复制
 1public final class EchoServer {
 2    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
 3
 4    public static void main(String[] args) throws Exception {
 5        // bossGroup处理connect事件
 6        // workerGroup处理read/write事件
 7        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 8        EventLoopGroup workerGroup = new NioEventLoopGroup();
 9        EchoServerHandler serverHandler = new EchoServerHandler();
10        try {
11            ServerBootstrap b = new ServerBootstrap();
12            b.group(bossGroup, workerGroup)
13             .channel(NioServerSocketChannel.class)
14             .option(ChannelOption.SO_BACKLOG, 100)
15             .handler(new LoggingHandler(LogLevel.INFO))
16             .childHandler(new ChannelInitializer<SocketChannel>() {
17                 @Override
18                 public void initChannel(SocketChannel ch) throws Exception {
19                     // 当连接建立后(register到childWorkerGroup前)初始化channel.pipeline
20                     ch.pipeline().addLast(serverHandler);
21                 }
22             });
23
24            // Start the server.
25            ChannelFuture f = b.bind(PORT).sync();
26            // Wait until the server socket is closed.
27            f.channel().closeFuture().sync();
28        } finally {
29            // Shut down all event loops to terminate all threads.
30            bossGroup.shutdownGracefully();
31            workerGroup.shutdownGracefully();
32        }
33    }
34}
35
36public class EchoServerHandler extends ChannelInboundHandlerAdapter {
37    @Override
38    public void channelRead(ChannelHandlerContext ctx, Object msg) {
39        ctx.write(msg);
40    }
41
42    @Override
43    public void channelReadComplete(ChannelHandlerContext ctx) {
44        ctx.flush();
45    }
46
47    @Override
48    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
49        // Close the connection when an exception is raised.
50        cause.printStackTrace();
51        ctx.close();
52    }
53}

EventLoopGroup创建

EventLoopGroup中可能包含了多个EventLoop,EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册和ServerSocket绑定操作等。关于NioEventLoop,后续专门写一篇文章分析,这里就不再展开,只需知道个大概即可,其架构图如下:

EventLoopGroup创建本质就是创建多个NioEventLoop,这里创建NioEventLoop就是初始化一个Reactor,包括selector和taskQueue。主要逻辑如下:

代码语言:javascript
复制
 1protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
 2                                            EventExecutorChooserFactory chooserFactory, Object... args) {
 3    // 创建NioEventLoop实例
 4    children = new EventExecutor[nThreads];
 5    // 初始化NioEventLoop,实际调用的是NioEventLoopGroup.newChild方法
 6    for (int i = 0; i < nThreads; i ++) {
 7        children[i] = newChild(executor, args);
 8    }
 9
10    // 多个NioEventLoop中选择策略
11    chooser = chooserFactory.newChooser(children);
12}
13
14NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
15                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
16    // 创建taskQueue
17    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
18    // 是不是很熟悉,java nio selector操作
19    provider = selectorProvider;
20    final SelectorTuple selectorTuple = openSelector();
21    selector = selectorTuple.selector;
22    unwrappedSelector = selectorTuple.unwrappedSelector;
23    selectStrategy = strategy;
24}

EventLoopGroup创建OK后,启动的第一步就算完成了,接下来该进行bind、listen操作了。

ServerBootstrap流程

bind操作

bind操作是ServerBootstrap流程重要的一环,bind流程涉及到NioChannel的创建、初始化和注册(到Selector),启动NioEventLoop,之后就可以对外提供服务了。

代码语言:javascript
复制
 1public ChannelFuture bind(SocketAddress localAddress) {
 2    validate(); // 参数校验
 3    return doBind(localAddress);
 4}
 5private ChannelFuture doBind(final SocketAddress localAddress) {
 6    // 1. 初始化注册操作
 7    final ChannelFuture regFuture = initAndRegister();
 8    final Channel channel = regFuture.channel();
 9    if (regFuture.cause() != null) {
10        return regFuture;
11    }
12
13    // 2. doBind0操作
14    if (regFuture.isDone()) {
15        // register已完成,这里直接调用doBind0
16        ChannelPromise promise = channel.newPromise();
17        doBind0(regFuture, channel, localAddress, promise);
18        return promise;
19    } else {
20        // register还未完成,注册listener回调,在回调中调用doBind0
21        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
22        regFuture.addListener(new ChannelFutureListener() {
23            /**
24             * channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,
25             * 会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法
26             */
27            @Override
28            public void operationComplete(ChannelFuture future) throws Exception {
29                Throwable cause = future.cause();
30                if (cause != null) {
31                    promise.setFailure(cause);
32                } else {
33                    promise.registered();
34                    doBind0(regFuture, channel, localAddress, promise);
35                }
36            }
37        });
38        return promise;
39    }
40}

这里涉及到2个操作,一个是channel的创建、初始化、注册操作,另一个是bind操作,下面兵分两路,分别来讲。

注意,这里如果main线程执行到regFuture.isDone()时,register还未完成,那么main线程是不会直接调用bind操作的,而是往regFuture上注册一个Listenner,这样channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法,进而在执行bind操作。

channel初始化、注册操作

代码语言:javascript
复制
 1final ChannelFuture initAndRegister() {
 2    Channel channel = null;
 3    try {
 4        // 1.创建(netty自定义)Channel实例,并初始化
 5        // channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannel
 6        channel = channelFactory.newChannel();
 7        // 2.初始化channel()
 8        init(channel);
 9    } catch (Throwable t) {
10    }
11
12    // 3.向Selector注册channel
13    ChannelFuture regFuture = config().group().register(channel);
14    if (regFuture.cause() != null) {
15        if (channel.isRegistered()) {
16            channel.close();
17        } else {
18            channel.unsafe().closeForcibly();
19        }
20    }
21
22    return regFuture;
23}

这里重点关注下初始化channel流程,主要操作是设置channel属性、设置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注册到selector之后被回调的。

代码语言:javascript
复制
 1/**
 2 * 初始channel属性,也就是ChannelOption对应socket的各种属性。
 3 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以与Linux中的setsockopt函数对应起来。
 4 * 最后将ServerBootstrapAcceptor添加到对应channel的ChannelPipeline中。
 5 */
 6@Override
 7void init(Channel channel) throws Exception {
 8    final Map<ChannelOption<?>, Object> options = options0();
 9    synchronized (options) {
10        setChannelOptions(channel, options, logger);
11    }
12
13    ChannelPipeline p = channel.pipeline();
14    // 获取childGroup和childHandler,传递给ServerBootstrapAcceptor
15    final EventLoopGroup currentChildGroup = childGroup;
16    final ChannelHandler currentChildHandler = childHandler;
17    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
18    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
19    synchronized (childOptions) {
20        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
21    }
22    synchronized (childAttrs) {
23        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
24    }
25
26    p.addLast(new ChannelInitializer<Channel>() {
27        /**
28         * 在register0中,将channel注册到Selector之后,会调用invokeHandlerAddedIfNeeded,
29         * 进而调用到这里的initChannel方法
30         */
31        @Override
32        public void initChannel(final Channel ch) throws Exception {
33            final ChannelPipeline pipeline = ch.pipeline();
34            ChannelHandler handler = config.handler();
35            if (handler != null) {
36                pipeline.addLast(handler);
37            }
38
39            // 这里注册一个添加ServerBootstrapAcceptor的任务
40            ch.eventLoop().execute(new Runnable() {
41                @Override
42                public void run() {
43                    // 添加ServerBootstrapAcceptor
44                    pipeline.addLast(new ServerBootstrapAcceptor(
45                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
46                }
47            });
48        }
49    });
50}

channel初始化之后就该将其注册到selector,即下面的register流程:

代码语言:javascript
复制
 1public ChannelFuture register(Channel channel) {
 2    // next()挑选一个EventLoop,默认轮询选择某个NioEventLoop
 3    return next().register(channel);
 4}
 5public ChannelFuture register(final ChannelPromise promise) {
 6    promise.channel().unsafe().register(this, promise);
 7    return promise;
 8}
 9// AbstractChannel
10public final void register(EventLoop eventLoop, final ChannelPromise promise) {
11    AbstractChannel.this.eventLoop = eventLoop;
12
13    // 直接执行register0或者以任务方式提交执行
14    // 启动时,首先执行到这里的是main线程,所以是以任务的方式来提交执行的。
15    // 也就是说,该任务是NioEventLoop第一次执行的任务,即调用register0
16    if (eventLoop.inEventLoop()) {
17        register0(promise);
18    } else {
19        // 往NioEventLoop中(任务队列)添加任务时,如果NioEventLoop线程还未启动,则启动该线程
20        eventLoop.execute(new Runnable() {
21            @Override
22            public void run() {
23                register0(promise);
24            }
25        });
26    }
27}

register操作

register操作之后伴随着多个回调及listener的触发:

代码语言:javascript
复制
 1// AbstractChannel$AbstractUnsafe
 2private void register0(ChannelPromise promise) {
 3    boolean firstRegistration = neverRegistered;
 4    // 这里调用的是AbstractNioChannel.doRegister
 5    // 这里将channel注册上去,并没有关注对应的事件(read/write事件)
 6    doRegister();
 7    neverRegistered = false;
 8    registered = true;
 9
10    // 调用handlerAdd事件,这里就会调用initChannel方法,设置channel.pipeline,也就是添加 ServerBootstrapAcceptor
11    pipeline.invokeHandlerAddedIfNeeded();
12
13    // 调用operationComplete回调
14    safeSetSuccess(promise);
15    // 回调fireChannelRegistered
16    pipeline.fireChannelRegistered();
17    // Only fire a channelActive if the channel has never been registered. This prevents firing
18    // multiple channel actives if the channel is deregistered and re-registered.
19    if (isActive()) {
20        if (firstRegistration) {
21            // 回调fireChannelActive
22            pipeline.fireChannelActive();
23        } else if (config().isAutoRead()) {
24            beginRead();
25        }
26    }
27}

上面代码中的initChannel回调也就是设置对外监听channel的channelHanlder为ServerBootstrapAcceptor;operationComplete回调也就是触发ChannelFutureListener.operationComplete,这里会进行后续的doBind操作。

代码语言:javascript
复制
 1// AbstractBootstrap
 2private static void doBind0(
 3        final ChannelFuture regFuture, final Channel channel,
 4        final SocketAddress localAddress, final ChannelPromise promise) {
 5    // doBind0向EventLoop任务队列中添加一个bind任务来完成后续操作。
 6    channel.eventLoop().execute(new Runnable() {
 7        @Override
 8        public void run() {
 9            if (regFuture.isSuccess()) {
10                // bind操作
11                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
12            }
13        }
14    });
15}

bind操作

在回顾上面的bind操作代码,bind操作是在register之后进行的,因为register0是由NioEventLoop执行的,所以main线程需要先判断下future是否完成,如果完成直接进行doBind即可,否则添加listener回调进行doBind。

bind操作及后续初始化操作(channelActive回调、设置监听事件)

代码语言:javascript
复制
 1public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 2    boolean wasActive = isActive();
 3    try {
 4        // 调用底层bind操作
 5        doBind(localAddress);
 6    } catch (Throwable t) {
 7        safeSetFailure(promise, t);
 8        closeIfClosed();
 9        return;
10    }
11
12    if (!wasActive && isActive()) {
13        invokeLater(new Runnable() {
14            @Override
15            public void run() {
16                pipeline.fireChannelActive();
17            }
18        });
19    }
20    safeSetSuccess(promise);
21}
22
23// 最后底层bind逻辑bind入参包括了backlog,也就是底层会进行listen操作
24// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel
25protected void doBind(SocketAddress localAddress) throws Exception {
26    if (PlatformDependent.javaVersion() >= 7) {
27        javaChannel().bind(localAddress, config.getBacklog());
28    } else {
29        javaChannel().socket().bind(localAddress, config.getBacklog());
30    }
31}
32
33public void channelActive(ChannelHandlerContext ctx) throws Exception {
34    // 回调fireChannelActive
35    ctx.fireChannelActive();
36
37    // 设置selectKey监听事件,对于监听端口就是SelectionKey.OP_ACCEPT,对于新建连接就是SelectionKey.OP_READ
38    readIfIsAutoRead();
39}

到这里为止整个netty启动流程就基本接近尾声,可以对外提供服务了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • server端启动流程
    • EventLoopGroup创建
    • ServerBootstrap流程
      • bind操作
        • channel初始化、注册操作
          • register操作
            • bind操作
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档