前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty服务端的新连接接入源码解析

Netty服务端的新连接接入源码解析

作者头像
止术
发布2021-08-06 14:28:54
3810
发布2021-08-06 14:28:54
举报
文章被收录于专栏:求道求道

经过上一章节的学习,我们基本了解了Netty是如何对IO事件以及异步任务的处理了,今天我们就一起来学习一下,Netty是如何处理新连接接入与数据读取的!

一、源码寻找

我们上一章节学到了,当存在IO事件的时候,Netty的反应堆线程会监听这些事件,然后进行处理,忘记的,可以回顾一下上一章节,,我们这里直接进入到:

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

这里的代码,我们昨天只是做了一个大概的分析,并没有深入的讲解,这一章节具体分析一下新连接的接入和Channel数据的读取。

代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    .........忽略有效性验证................

    try {
        int readyOps = k.readyOps();
        ...................忽略其他 事件的处理逻辑...............
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

我们重点关注当事件存在读事件或者新连接接入事件的时候,才会进入到这一判断逻辑,那么由此可见,我们兵丁是要关注unsafe.read() 这一行代码了!

二、新连接接入源码分析

首选,我们声明一下,我们现在一直是按照服务端启动逻辑进行分析的,那么服务端逻辑分析,对照通道就是NioServerSocketChannel, 我们在创建NioServerSocketChannel的时候初始化过一个Unsafe对象,他是NioMessageUnsafe类型的,如果有疑问的同学可以回顾一下NioServerSocketChannel的初始化过程!

所以,必然,我们这里的unsafe.read(); 就必然进入的是NioMessageUnsafe的read方法:

image-20210504115544746

代码语言:javascript
复制
@Override
public void read() {
    ..................忽略不必要代码............
    try {
        try {
            do {
                //读取数据  可能是数据  也可能是新连接
                int localRead = doReadMessages(readBuf);
                //如果没数据就跳出
                if (localRead == 0) {
                    break;
                }
                //-1 就是连接被关闭 
                if (localRead < 0) {
                    closed = true;
                    break;
                }
				//读取的连接数增加
                allocHandle.incMessagesRead(localRead);
                //每次默认读取最大16个连接  剩余的后续去读
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
		//获取连接数量或者读取的数据的数量
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //开始传播channelRead属性
            pipeline.fireChannelRead(readBuf.get(i));
        }
        //清空缓冲区
        readBuf.clear();
        allocHandle.readComplete();
        //传播读取完成事件
        pipeline.fireChannelReadComplete();
        ...................忽略不必要代码.......................
    } finally {
        ...................忽略不必要代码.......................
    }
}

1. 读取新连接

代码语言:javascript
复制
int localRead = doReadMessages(readBuf);

这行代码是读取新连接的主要逻辑:

image-20210504181959983

代码语言:javascript
复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    //调用JDK ServerSocketChannel获取新连接  JDK SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            //将客户端连接直接包装为 Netty的管道包装对象 NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        ................忽略异常处理.............
    }
    return 0;
}

可以看到这里的逻辑比较简单,首先,Netty会使用先前保存的JDK 的原生的SocketChannel调用accept方法进行获取JDK新连接的管道!

注意此时获取的管道是JDK NIO的原生的管道对象,和Netty还没有关系,然后再将JDK NIO原生的Channel包装为Netty的NioSocketChannel放到缓冲区里面,注意此时放到缓冲区里面的对象就是Netty的包装对象了!包装完成之后直接返回 ,此时我们的缓冲区就存在数据量,这个数据是NioSocketChannel对象!

我们回到主线 read方法,当调用完doMessage方法之后开始就要处理这个NioSocketChannel了呀!

2. 处理新连接的管道

代码语言:javascript
复制
pipeline.fireChannelRead(readBuf.get(i));

从代码上看,可以看到,他是把刚刚我们读到的NioSocketChannel出来往下传播,这个代码是在通道内传播,我们前几节课讲过,此时Pipeline的结构是如图所示的数据结构:

image-20210504194847065

我们看如下代码:

代码语言:javascript
复制
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

他是从头节点开始传播的,channelRead的传播是自上而下的,所以就势必会传播到 ServerBootstrapAcceptor的逻辑中,所以我们进入到ServerBootstrapAcceptor#channelRead方法:

代码语言:javascript
复制
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //能进入都这一段逻辑的就必定是通道对象,因为只有服务端管道会存在该处理器
    final Channel child = (Channel) msg;
	//向服务端管道追加childHandler   在构建ServerBootStrap的时候传入的
    child.pipeline().addLast(childHandler);
	//在构建ServerBootStrap的时候传入的
    setChannelOptions(child, childOptions, logger);
	//在构建ServerBootStrap的时候传入的
    setAttributes(child, childAttrs);
    try {
        //开始进行注册,注册逻辑同NioServerSocketChannel相同
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

我们可以看到这个,先是向通道内注册一些客户端的参数,然后开始进行注册Channel, 注册的时候同NioServerSocketChannel的注册逻辑一样,只不过NioSocketChannel的关注事件是OP_READ事件,这里留一个作业,同学们可以自己分析一下NioSocketChannel的创建,分析一下它的注册逻辑与反应堆逻辑!

三、客户端数据读取源码解析

我们还是直接回到

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    .........忽略有效性验证................

    try {
        int readyOps = k.readyOps();
        ...................忽略其他 事件的处理逻辑...............
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

上面分析过,负责客户端新连接的通道是NioSocketChannel,大家自行分析一下内部逻辑,与NioServerSocketChannel的相似度90%!

1. 读取通道数据

NioSocketChannel的Unsafe是 NioByteUnsafe, 所以我们直接进入到:

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

查看他是如何进行数据读取的:

代码语言:javascript
复制
@Override
public final void read() {
    ........................忽略........................
    //获取客户端通道管道
    final ChannelPipeline pipeline = pipeline();
    //获取一个内存分配器
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            //分配一个ByteBuf缓冲区
            byteBuf = allocHandle.allocate(allocator);
            //开始向缓冲区内写入通道的数据
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // 如果没有读取到缓冲区,就释放缓冲区.
                byteBuf.release();
                byteBuf = null;
                //设置关闭标志
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            //传播一次readChnnel事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        //传播一次readChnnelComplete事件 
        pipeline.fireChannelReadComplete();
		//关闭通道
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        ....................忽略不必要代码.....................
    }
}

我们整体将逻辑分为以下几个步骤:

  1. 获取一个内存分配器,Netty中存在一个专门用于分配ByteBuf的内存分配器。这里是将它获取出来!
  2. 使用上一步获取的内存分配器分配一块缓冲区,用域后续的使用!
  3. 开始读取通道内的数据写入预先分配好的缓冲区!
  4. 读取数据完毕后,将带有数据的缓冲区调用pieline的传播方法进行数据的传播 readChannel方法!
  5. 当通道内的数据被处理完后,传播一次 channelReadComplete方法

四、总结

  1. 在Netty中NioServerSocketChannel与NioSocketChannel的处理中,对于数据的读取拥有不同的处理方法,NioServerSockerChannel主要用于处理新连接的,在初始化的时候就会在通道内加入一个新连接接入器ServerBootstrapAcceptor! NioServerSocketChannel对象在读取到数据后将之包装为NioSocketChannel对象,然后使用ServerBootstrapAcceptor进行NioSocketChannel的注册与启动反应堆线程!
  2. 当通道内存在数据的时候,被NioSockerChannel探测到后,就会先分配一块缓冲区,将数据读取进预先分配好的缓冲区,然后进行数据的向下通道流转(事件触发)!
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 源码学徒 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、源码寻找
  • 二、新连接接入源码分析
    • 1. 读取新连接
      • 2. 处理新连接的管道
      • 三、客户端数据读取源码解析
        • 1. 读取通道数据
        • 四、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档