前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第十四节 netty源码分析之客户端源码分析01

第十四节 netty源码分析之客户端源码分析01

作者头像
用户1418372
发布2019-02-26 11:17:23
6050
发布2019-02-26 11:17:23
举报
文章被收录于专栏:清晨我上码

首先分析netty客户端,源码来自netty官方example

代码语言:javascript
复制
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            //初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
             .channel(NioSocketChannel.class)
                    //将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            //包含channel(注意channel可抽象为socket链接来理解)实例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最终channelFactory.newChannel();这里的factory就是前面设置的ReflectiveChannelFactory)
           //  NioSocketChannel最终newSocket 来打开一个新的 Java NIO SocketChannel, 最后调用父类AbstractChannel(Channel parent)
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
  • 分析重点,1、NioSocketChannel初始化。2、handler的添加。3、

首先group(group)将我们创建的NioEventLoopGroup作为客户端的eventLoopGroup,

接着.channel(NioSocketChannel.class);会初始化一个ChannelFactory用于,目的是为后续创建channel,源码如下

代码语言:javascript
复制
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

上述ReflectiveChannelFactory包含属性clazz就是用于后续创建对象

那么channel是何时创建又是何时注册呢?下面继续分析

思考,channel创建意味这什么?

代码语言:javascript
复制
netty的底层实现为java 的nio 前面文章已经介绍过。很明显netty的channel和nio中的channel其实是一类的。
那么客户端创建channel其实就是和服务端链接。因此我们分析下面的connect方法来验证一下

ChannelFuture f = b.connect(HOST, PORT).sync();

一路追踪找到下面这段代码

代码语言:javascript
复制
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //创建channel和注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            //建立链接
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
  • 上述中最为重要的两个方法,一个initAndRegister 用于channel创建和注册。一个是doResolveAndConnect0用户channel于连接
initAndRegister 源码分析:
代码语言:javascript
复制
//  实例化channel和 channel 的注册过程
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        //创建channel
            channel = channelFactory.newChannel();
            //初始
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
//1、config()方法为子类Bootstrap实现返回BootstrapConfig对象。
//2、BootstrapConfig中group()其实调用的为其父类 AbstractBootstrapConfig中的group()方法,该方法中返回bootstrap.group()即bootstrap中我们添加的group
//3、也就是最终调用NioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法。该方法返回 next().register(channel);
//4、 next()方法MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup实现的。该方法返回  chooser.next(); 这里的
//这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回;可参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);)
//我们还记得最初NioEventLoopGroup的构造器最终会调用MultithreadEventExecutorGroup的构造器MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
// 在构造器中有一个children[i] = newChild(executor, args);方法格外引起我们的注意。因为这里的前面的next方法返回children数组中的值,newChild方法的实现类在NioEventLoopGroup中
//它返回return new NioEventLoop(this, executor, (SelectorProvider) args[0],
// ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);最后NioEventLoop的父类SingleThreadEventLoop 父类中的注册方法register,该方法调用到第四步
// 5、promise.channel().unsafe().register(this, promise);获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register.
 //6、在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel。AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
//7、AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
        ChannelFuture regFuture = config().group().register(channel);//注册
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
  1. 先分析创建,就是利用ReflectiveChannelFactory以及最初我们添加NioSocketChannel.class 来创建新的对象。

在分析init方法前,我们先简要分析下NioSocketChannel创建过程都做了哪些见不得人的事。 第一步先找到构造器:

代码语言:javascript
复制
 /**
     * Create a new instance
     */
    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

这里的DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); 至于SelectorProvider的作用前面文章介绍过,会根据同的操作系统选择不同的io处理方式 继续跟踪代码会构造器中会掉用newSocket发挥java nio中的SocketChannel

代码语言:javascript
复制
 public NioSocketChannel(Channel parent, SocketChannel socket) {
 //掉用父类
        super(parent, socket);
        //config封装channle和连接信息
        config = new NioSocketChannelConfig(this, socket.socket());
    }

一直追踪就会找到下面的代码

代码语言:javascript
复制
  protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        //创建unsafe
        unsafe = newUnsafe();
        //创建pipeline
        pipeline = newChannelPipeline();
    }

总结分析:创建channel后同时会创建pipeline,并创建操作底层nio的unsafe,补充如下

代码语言:javascript
复制
NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新Java NIO SocketChannel。 AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:parent 属性置为 null
unsafe 通过newUnsafe() 实例化一个unsafe对象, 它的类型是AbstractNioByteChannel.NioByteUnsafe 内部类pipeline 是 new DefaultChannelPipeline(this) 新创建的实例. 这里体现了:Each channel has its own pipeline and it is created automatically when a new channel is created.
AbstractNioChannel 中的属性:
        SelectableChannel ch 被设置为 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.
        readInterestOp 被设置为 SelectionKey.OP_READ
        SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
NioSocketChannel 中的属性:
        SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
        channel = channelFactory.newChannel();
        init(channel);
  1. 分析初始化init方法的作用(实现在子类Bootstrap中)
代码语言:javascript
复制
void init(Channel channel) throws Exception {
  //pipeline
        ChannelPipeline p = channel.pipeline();
        // config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            //设置channel类型
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

所以上面的初始化就很明显了,重点是p.addLast(config.handler())就是将最初ChannelInitializer这个handler添加到pipeline 并且设置channel的类型,option0方法返回就是我们最初设定的ChannelOption.TCP_NODELAY

  1. 接着我们分析initAndRegister。 的另外一个重要的方法config().group().register(channel);他的作用是channel注册

类中注释基本已经说明,这里简要分析,直接找到ioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法

代码语言:javascript
复制
@Override
    public ChannelFuture register(Channel channel) {
        //next()方法返回为executor数组中的一个executor。这里NioEventLoop中的register(channel)调用的是其父类SingleThreadEventLoop的方法,掉用
        return next().register(channel);
    }

再次找到MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup找的next方法

代码语言:javascript
复制
 @Override
    public EventExecutor next() {
        //chooserFactory.newChooser(children);
        return chooser.next();
    }

这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回; 参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器

代码语言:javascript
复制
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
//创建一个大小为 nThreads 的 EventExecutor数组
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //newChild的实现类在NioEventLoopGroup中,返回NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
//从DefaultEventExecutorChooserFactory工厂实现类中的newChooser方法: 根据线程数在children 数组中选出一个合适的 EventExecutor 实例
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

这个构造器很长,我们关心的在chooser = chooserFactory.newChooser(children);从DefaultEventExecutorChooserFactory工厂实现类中的newChooser方法: 根据线程数在children 数组中选出一个合适的 EventExecutor 实例。 根据代码中children = new EventExecutor[nThreads];和children[i] = newChild(executor, args);newChild的实现类在NioEventLoopGroup中,返回NioEventLoop 所以这里使用的数组中的对象NioEventLoop

代码语言:javascript
复制
public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
            //return executors[idx.getAndIncrement() & executors.length - 1];
        } else {
            return new GenericEventExecutorChooser(executors);
            //return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

那么上面的initAndRegister中的方法config().group().register(channel);最后掉用的为NioEventLoop重的register0

代码语言:javascript
复制
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
        try {
            ch.register(unwrappedSelector, interestOps, task);
        } catch (Exception e) {
            throw new EventLoopException("failed to register a channel", e);
        }
    }

上面代码我们在java NIO使用很常见的注册,更多细节可查看NioEventLoop类源码 补充:

代码语言:javascript
复制
EventLoopGroup(包含java nio中的selector) 的初始化过程
    EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池
    如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
    MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
    抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
NioEventLoop 属性:
        SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
        Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

连接以及handler

另外一个很重要的连接过程doResolveAndConnect0分析

源码在bootstrap类中:

代码语言:javascript
复制
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            //其他省略
        return promise;
    }

上面我们最关心的doConnect方法

代码语言:javascript
复制
 private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }

继续查看channel的channel.connect方法。connect的实现在我们niosocketChannel的父类AbstractChannel中

代码语言:javascript
复制
 @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }

最终调用pipeline终的connect(前面以及说了创建channel也会创建一个pipeline这里DefaultChannelPipeline)

代码语言:javascript
复制
 @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

看到tail可以猜测这个connect事件对于客户端应该属于写操作也就是outbound事件,后续会详细介绍

先看下pepeline的构造器(可以看到pipeline是一个有AbstractChannelHandlerContext类型的双向链表)

代码语言:javascript
复制
 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

那么这个connect最后就会到tail中,也就是AbstractChannelHandlerContext中实现的connect(相关注释已添加上)

代码语言:javascript
复制
 @Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
// DefaultChannelPromiseelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect
        final AbstractChannelHandlerContext next = findContextOutbound();
        //next其实是找到headContext  unsafe.connect(remoteAddress, localAddress, promise);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            //当我们找到了一个 outbound 的 Context 后, 就调用它的 invokeConnect 方法, 这个方法中会调用 Context 所关联着的 ChannelHandler 的 connect 方法:
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

最初返回的next为TailContext然后调用invokeConnect方法(这个方法tailContext没有重写,所以调用父类AbstractChannelHandlerContext的方法)

代码语言:javascript
复制
 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                //pipeline.channel().unsafe();
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

已知connect中findContextOutbound会从: tail -> 自定义outboundhandler->head 这个调用过程 tail没有实现connect,又会调用父类的AbstractChannelHandlerContext的connect就是上面那个方法,然后继续向前寻找,如果我们没有实现connect方法, 则继续向前知道找到headContext,它实现了connect方法,重点看这个方法,是如何和服务端连接的

查看HeadContext的connect代码

代码语言:javascript
复制
  @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

** unsafe.connect 中unsafe为操作nio底层的对象,细节后续说明。需要知道的是这里最终调用java NIO的socketChannel.connect(remoteAddress)**

下一节介绍handler是如何添加的 总的来说, Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联, 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. 通过这两步, 就完成了 Netty Channel 的注册过程.

图片.png

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.01.25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 首先分析netty客户端,源码来自netty官方example
    • 首先group(group)将我们创建的NioEventLoopGroup作为客户端的eventLoopGroup,
      • initAndRegister 源码分析:
      • 另外一个很重要的连接过程doResolveAndConnect0分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档