前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty源码分析一之NioServerSocketChannel

netty源码分析一之NioServerSocketChannel

作者头像
山行AI
发布2019-10-21 16:17:59
6230
发布2019-10-21 16:17:59
举报
文章被收录于专栏:山行AI山行AI

netty是基于nio的基础上包装和拓展的,想要了解netty就先要了解它与nio之间的联系,那么NioServerSocketChannel就是首先要了解的类

先看一段nio编程的示例

代码语言:javascript
复制
int port = 9999;
            ServerSocketChannel serverSocketChannel = null;
            Selector selector = null;
            InetSocketAddress localAddress = new InetSocketAddress(port);
            Random rnd = new Random();

            try {
                /*创建选择器*/
                selector = Selector.open();

                /*创建服务器通道*/
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);

                /*设置监听服务器的端口,设置最大连接缓冲数为100*/
                serverSocketChannel.bind(localAddress, 100);

                /*服务器通道只能对tcp链接事件感兴趣*/
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            } catch (IOException e1) {
                return;
            }
            /*服务器线程被中断后会退出*/
            try{

                while(selector.select() > 0){
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){

                        key = it.next();
                        /*防止下次select方法返回已处理过的通道*/
                        it.remove();

                        /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
                        try{
                            /*serverSocketChannel通道只能对链接事件感兴趣*/
                            if(key.isAcceptable()){

                                /*accept方法会返回一个普通通道,
                                     每个通道在内核中都对应一个socket缓冲区*/
                                SocketChannel sc = serverSocketChannel.accept();
                                sc.configureBlocking(false);

                                /*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
                                int interestSet = SelectionKey.OP_READ;
                                sc.register(selector, interestSet, new Buffers(256, 256));
                            }


                            /*(普通)通道感兴趣读事件且有数据可读*/
                           if(key.isReadable()){

                               /*通过SelectionKey获取通道对应的缓冲区*/
                               Buffers  buffers = (Buffers)key.attachment();
                               ByteBuffer readBuffer = buffers.getReadBuffer();
                               ByteBuffer writeBuffer = buffers.gerWriteBuffer();

                               /*通过SelectionKey获取对应的通道*/
                               SocketChannel sc = (SocketChannel) key.channel();

                               /*从底层socket读缓冲区中读入数据*/
                               sc.read(readBuffer);
                               readBuffer.flip();

                                /*设置通道写事件*/
                                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);       
                               ...........

我们来看下上面的几步:

1.创建选择器

代码是:

代码语言:javascript
复制
selector = Selector.open();

Selector.open对应的代码是:

代码语言:javascript
复制
public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

是通过SelectorProvider来打开selector的

2. 创建服务器通道

代码是:

代码语言:javascript
复制
serverSocketChannel = ServerSocketChannel.open();

ServerSocketChannel.open()对应的代码是:

代码语言:javascript
复制
public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

这里调用的也是SelectorProvider.provider()的openServerSocketChannel方法。

3. 设置监听服务器的端口

代码是:

代码语言:javascript
复制
serverSocketChannel.bind(localAddress, 100)

4. 注册

代码是:

代码语言:javascript
复制
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

在通道上对感兴趣的key进行注册。最后调用的是java.nio.channels.SelectableChannel.register(java.nio.channels.Selector, int)

5. select

代码语言:javascript
复制
while(selector.select() > 0){
    Set<SelectionKey> keySet = selector.selectedKeys();
    Iterator<SelectionKey> it = keySet.iterator();
    SelectionKey key = null;
    while(it.hasNext()){

        key = it.next();
        /*防止下次select方法返回已处理过的通道*/
        it.remove();

        /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
        try{
            /*serverSocketChannel通道只能对链接事件感兴趣*/
            if(key.isAcceptable()){

                /*accept方法会返回一个普通通道,
                     每个通道在内核中都对应一个socket缓冲区*/
                SocketChannel sc = serverSocketChannel.accept();
                sc.configureBlocking(false);

                /*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
                int interestSet = SelectionKey.OP_READ;
                sc.register(selector, interestSet, new Buffers(256, 256));
                .........
  • 前面在selector上注册了serverSocketChannel的accept事件。
  • 使用selector进行select轮询,轮询到accept事件时遍历selector上之前注册过的每一个selectedKeys,然后对自己感兴趣的key进行处理。
  • 通过serverSocketChannel.accept()获取当前连接的客户端的普通通道socketChannel,并在selector上注册该socketChannel的OPREAD事件
  • 当客户端往服务端写数据时,需要在服务端注册read事件,这样下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。
代码语言:javascript
复制
/*(普通)通道感兴趣读事件且有数据可读*/
   if(key.isReadable()){

       /*通过SelectionKey获取通道对应的缓冲区*/
       Buffers  buffers = (Buffers)key.attachment();
       ByteBuffer readBuffer = buffers.getReadBuffer();
       ByteBuffer writeBuffer = buffers.gerWriteBuffer();

       /*通过SelectionKey获取对应的通道*/
       SocketChannel sc = (SocketChannel) key.channel();

       /*从底层socket读缓冲区中读入数据*/
       sc.read(readBuffer);
       readBuffer.flip();

        /*设置通道写事件*/
        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
  • SocketChannel:通过TCP读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
  • 一个Selector同时管理多个Socket,多个客户端连接过来时,每个客户端都有自己对应的SocketChannel。ServerSocketChannel也是在Selector上注册的 但是一般注册的是accept事件,具体的读写都是各自的SocketChannel自己注册管理的。

netty中的NioServerSocketChannel

netty server的一般写法:

代码语言:javascript
复制
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new WebSocketServerInitializer(sslCtx));

            Channel ch = b.bind(PORT).sync().channel();

            System.out.println("Open your web browser and navigate to " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

1. ServerBootStrap的bind方法

代码语言:javascript
复制
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress):
 /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

doBind方法

代码语言:javascript
复制
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
..........

initAndRegister()

代码语言:javascript
复制
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            System.out.println("channel:" + channel);
            init(channel);

其中channelFactory的创建位置在bootstrap.channel方法即io.netty.bootstrap.AbstractBootstrap#channel方法里,具体代码如下:

代码语言:javascript
复制
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

channelFactory.newChannel()

具体调用的是:io.netty.channel.ReflectiveChannelFactory#newChannel

最终构造的是:

代码语言:javascript
复制
/**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

这里的newSocket方法是:

代码语言:javascript
复制
private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

返回的是一个ServerSocketChannel对象。

调用重载的构造方法是:

代码语言:javascript
复制
/**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

NioServerSocketChannelConfig类是netty使用过程中比较重要的配置类。继续看super方法:

代码语言:javascript
复制
/**
     * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
     */
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }

这里传入的parent为null,ch为ServerSocketChannel,readInterestOp为SelectionKey.OP_ACCEPT。继续往上看:

代码语言:javascript
复制
/**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
  • parent为之前传入的null
  • ch为ServerSocketChannel
  • readInterestOp为SelectionKey.OP_ACCEPT
  • ch.configureBlocking(false)将channel配置成非阻塞

继续往下看:

代码语言:javascript
复制
/**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
  • id=newId()是用来生成channel的唯一标识channelId的:
代码语言:javascript
复制
protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }
    private DefaultChannelId() {
            data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
            int i = 0;

            // machineId
            System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
            i += MACHINE_ID.length;

            // processId
            i = writeInt(i, PROCESS_ID);

            // sequence
            i = writeInt(i, nextSequence.getAndIncrement());

            // timestamp (kind of)
            i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

            // random
            int random = PlatformDependent.threadLocalRandom().nextInt();
            i = writeInt(i, random);
            assert i == data.length;

            hashCode = Arrays.hashCode(data);
        }
  • newUnsafe()对应的是io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe方法:
代码语言:javascript
复制
@Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

对应的方法为:

它的父类中有相当多的方法,我们来看一下:

这些方法在后面源码分析中会来一一讲解它们的用途。

NioServerSocketChannel介绍

一些方法:

代码语言:javascript
复制
@Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

    @Override
    protected SocketAddress localAddress0() {
        return SocketUtils.localSocketAddress(javaChannel().socket());
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

    @Override
    protected void doClose() throws Exception {
        javaChannel().close();
    }

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
  • javaChannel()返回的是jdk的ServerSocketChannel
  • doBind()内部是通过jdk的ServerSocketChannel进行bind操作的
  • doReadMessages()的SocketChannel ch = SocketUtils.accept(javaChannel())内部实现代码为:
代码语言:javascript
复制
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

实际上是通过serverSocketChannel.accept()返回一个SocketChannel,可以与上面的nio编程进行类比。

总结

NioServerSocketChannel实际上是对nio的ServerSocketChannel的一些方法和使用进行了封装。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 先看一段nio编程的示例
    • 1.创建选择器
      • 2. 创建服务器通道
        • 3. 设置监听服务器的端口
          • 4. 注册
            • 5. select
            • netty中的NioServerSocketChannel
              • 1. ServerBootStrap的bind方法
                • doBind方法
                  • initAndRegister()
                    • channelFactory.newChannel()
                    • NioServerSocketChannel介绍
                    • 总结
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档