专栏首页TopCoderNetty 处理连接那些事

Netty 处理连接那些事

Netty的连接处理就是IO事件的处理,IO事件包括读事件、ACCEPT事件、写事件和OP_CONNECT事件。

IO事件的处理是结合ChanelPipeline来做的,一个IO事件到来,首先进行数据的读写操作,然后交给ChannelPipeline进行后续处理,ChannelPipeline中包含了channelHandler链(head + 自定义channelHandler + tail)。 使用channelPipeline和channelHandler机制,起到了解耦和可扩展的作用。一个IO事件的处理,包含了多个处理流程,这些处理流程正好对应channelPipeline中的channelHandler。如果对数据处理有新的需求,那么就新增channelHandler添加到channelPipeline中,这样实现很6,以后自己写代码可以参考。

说到这里,一般为了满足扩展性要求,常用2种模式:

  • 方法模板模式:模板中定义了各个主流程,并且留下对应hook方法,便于扩展。
  • 责任链模式:串行模式,可以动态添加链数量和对应回调方法。

netty的channelHandlerchannelPipeline可以理解成就是责任链模式,通过动态增加channelHandler可达到复用和高扩展性目的。

了解netty连接处理机制之前需要了解下NioEventLoop模型,其中处理连接事件的架构图如下:

对应的处理逻辑源码为:

 1// 处理各种IO事件
 2private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 3    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
 4
 5    try {
 6        int readyOps = k.readyOps();
 7        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
 8            // OP_CONNECT事件,client连接上客户端时触发的事件
 9            int ops = k.interestOps();
10            ops &= ~SelectionKey.OP_CONNECT;
11            k.interestOps(ops);
12            unsafe.finishConnect();
13        }
14
15        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
16            ch.unsafe().forceFlush();
17        }
18
19        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
20            // 注意,这里读事件和ACCEPT事件对应的unsafe实例是不一样的
21            // 读事件 -> NioByteUnsafe,  ACCEPT事件 -> NioMessageUnsafe
22            unsafe.read();
23        }
24    } catch (CancelledKeyException ignored) {
25        unsafe.close(unsafe.voidPromise());
26    }
27}

从上面代码来看,事件主要分为3种,分别是OP_CONNECT事件、写事件和读事件(也包括ACCEPT事件)。下面分为3部分展开:

ACCEPT事件

 1// NioMessageUnsafe
 2public void read() {
 3    assert eventLoop().inEventLoop();
 4    final ChannelConfig config = config();
 5    final ChannelPipeline pipeline = pipeline();
 6    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 7    allocHandle.reset(config);
 8
 9    boolean closed = false;
10    Throwable exception = null;
11    try {
12        do {
13            // 调用java socket的accept方法,接收请求
14            int localRead = doReadMessages(readBuf);
15            // 增加统计计数
16            allocHandle.incMessagesRead(localRead);
17        } while (allocHandle.continueReading());
18    } catch (Throwable t) {
19        exception = t;
20    }
21
22    // readBuf中存的是NioChannel
23    int size = readBuf.size();
24    for (int i = 0; i < size; i ++) {
25        readPending = false;
26        // 触发fireChannelRead
27        pipeline.fireChannelRead(readBuf.get(i));
28    }
29    readBuf.clear();
30    allocHandle.readComplete();
31    pipeline.fireChannelReadComplete();
32}

连接建立好之后就该连接的channel注册到workGroup中某个NIOEventLoop的selector中,注册操作是在fireChannelRead中完成的,这一块逻辑就在ServerBootstrapAcceptor.channelRead中。

 1// ServerBootstrapAcceptor
 2public void channelRead(ChannelHandlerContext ctx, Object msg) {
 3    final Channel child = (Channel) msg;
 4
 5    // 设置channel的pipeline handler,及channel属性
 6    child.pipeline().addLast(childHandler);
 7    setChannelOptions(child, childOptions, logger);
 8
 9    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
10        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
11    }
12
13    try {
14        // 将channel注册到childGroup中的Selector上
15        childGroup.register(child).addListener(new ChannelFutureListener() {
16            @Override
17            public void operationComplete(ChannelFuture future) throws Exception {
18                if (!future.isSuccess()) {
19                    forceClose(child, future.cause());
20                }
21            }
22        });
23    } catch (Throwable t) {
24        forceClose(child, t);
25    }
26}

READ事件

 1// NioByteUnsafe
 2public final void read() {
 3    final ChannelConfig config = config();
 4    final ChannelPipeline pipeline = pipeline();
 5    final ByteBufAllocator allocator = config.getAllocator();
 6    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
 7    allocHandle.reset(config);
 8
 9    ByteBuf byteBuf = null;
10    boolean close = false;
11    try {
12        do {
13            byteBuf = allocHandle.allocate(allocator);
14            // 从channel中读取数据,存放到byteBuf中
15            allocHandle.lastBytesRead(doReadBytes(byteBuf));
16
17            allocHandle.incMessagesRead(1);
18            readPending = false;
19
20            // 触发fireChannelRead
21            pipeline.fireChannelRead(byteBuf);
22            byteBuf = null;
23        } while (allocHandle.continueReading());
24
25        // 触发fireChannelReadComplete,如果在fireChannelReadComplete中执行了ChannelHandlerContext.flush,则响应结果返回给客户端
26        allocHandle.readComplete();
27        // 触发fireChannelReadComplete
28        pipeline.fireChannelReadComplete();
29
30        if (close) {
31            closeOnRead(pipeline);
32        }
33    } catch (Throwable t) {
34        if (!readPending && !config.isAutoRead()) {
35            removeReadOp();
36        }
37    }
38}

写事件

正常情况下一般是不会注册写事件的,如果Socket发送缓冲区中没有空闲内存时,再写入会导致阻塞,此时可以注册写事件,当有空闲内存(或者可用字节数大于等于其低水位标记)时,再响应写事件,并触发对应回调。

1if ((readyOps & SelectionKey.OP_WRITE) != 0) {
2    // 写事件,从flush操作来看,虽然之前没有向socket缓冲区写数据,但是已经写入到
3    // 了chnanel的outboundBuffer中,flush操作是将数据从outboundBuffer写入到
4    // socket缓冲区
5    ch.unsafe().forceFlush();
6}

CONNECT事件

该事件是client触发的,由主动建立连接这一侧触发的。

1if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
2    // OP_CONNECT事件,client连接上客户端时触发的事件
3    int ops = k.interestOps();
4    ops &= ~SelectionKey.OP_CONNECT;
5    k.interestOps(ops);
6
7    // 触发finishConnect事件,其中就包括fireChannelActive事件,如果有自定义的handler有channelActive方法,则会触发
8    unsafe.finishConnect();
9}

本文分享自微信公众号 - TopCoder(gh_12e4a74a5c9c)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 7 Papers | 2019安卓手机AI性能评测;谷歌T5预训练模型刷榜GLUE;自动驾驶论文综述

    论文 1:AI Benchmark: All About Deep Learning on Smartphones in 2019

    机器之心
  • NIO系列(一)——介绍和Buffer缓冲区

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    逝兮诚
  • 树莓派上利用 Tensorflow 实现小车的自动驾驶

    github传送门:https://github.com/Timthony/self_drive

    芋道源码
  • 一个故事讲清楚 NIO

    每来一个顾客,马上由一位职员来接待处理,并且这个职员需要负责以上4个完整流程。当超过10个顾客时,剩余的顾客需要排队等候。

    芋道源码
  • IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)

    在非阻塞模式中,发出Socket的 accept()和 read()操作时,如果内核中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个信息。也就是...

    未读代码
  • 说说Netty的线程模型

    最近发现极客时间的很多课程中,都穿插到了 Netty,可见 Netty 的重要性。基于此,给大家推荐一下这篇文章!

    好好学java
  • Quartus II和Nios II使用时遇到的错误及解决方法总结

    发现提示错误的器件是epcs,其实epcs是没有问题的。通过修改bsp的配置即可,Bsp右击-nios ii –bspeditor 取消选中所有选项即可。

    FPGA开源工作室
  • 面试官:为什么Java线程没有Running状态?

    Java虚拟机层面所暴露给我们的状态,与操作系统底层的线程状态是两个不同层面的事。具体而言,这里说的 Java 线程状态均来自于 Thread 类下的 Stat...

    用户4143945
  • TSN、智能驾驶和边缘计算有什么关系?

    昨天,一则新闻在时间确定性网络群里刷屏了:创时科技开业,上汽智能驾驶又进关键一步。对于时间触发以太网和TTTech公司,笔者曾在文章一个人,一个想法,一家公司和...

    网络交换FPGA
  • 天池中间件大赛 Dubbo Mesh 优化总结(QPS 从 1000 到 6850)

    天池中间件大赛的初赛在今早终于正式结束了,公众号停更了一个月,主要原因就是博主的空余时间几乎全花在这个比赛上,第一赛季结束,做下参赛总结,总的来说,收获不小。

    芋道源码

扫码关注云+社区

领取腾讯云代金券