专栏首页TopCoderNetty 黏包拆包机制

Netty 黏包拆包机制

编者注:学习netty处理黏包和拆包,首先要知道什么是黏包和拆包问题?

黏包和拆包的产生是由于TCP拥塞控制算法(比如angle算法)和TCP缓冲区机制导致的,angle算法简单来说就是通过一些规则来尽可能利用网络带宽,尽可能的发送足够大的数据。TCP(发送/接收)缓冲区会暂缓数据,并且是有最大容量的。

黏包的产生是由于一次TCP通信数据量较少,导致多个TCP数据合并在一起(这里的合并可能发生在发送缓冲区合并后发送,也可能发生在接收缓冲区合并后应用程序一次性读取)。拆包的产生是由于一次TCP通信数据量较大(比如超过了MTU),导致发送时分片发送,这样接收时是多次接收后才是一个完整的数据。

netty处理黏包和拆包问题,思路就是以定长方式读取接收到的数据来处理(比如读取固定长度数据量、以TLV方式读取数据、以固定分隔符读取数据等)

下面以一个定长方式读取数据的示例来分析下Netty的处理机制,server端处理TCP黏包和拆包示例代码:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap boot = new ServerBootstrap();
    boot.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .localAddress(60000)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            // 定长接收消息,用于处理黏包分包问题
                            .addLast(new FixedLengthFrameDecoder(1))
                            .addLast(new EchoHandler());
                }
            });

    // start
    ChannelFuture future = boot.bind().sync();
    future.channel().closeFuture().sync();
} catch (Exception e) {
    e.printStackTrace();
} finally {
    // shutdown
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

FixedLengthFrameDecoder类继承关系如下,下面就以该类为例讲解下netty 处理粘包分包机制。

这里思考一下,如果接收到的数据未达到FixedLengthFrameDecoder要求的长度,这个时候Netty该如何处理呢?

首先可以肯定的一点是,接收到数据长度不够,是不会进行后续channelHandler处理的。Netty的处理机制是会将接收到的数据存储到ByteToMessageDecoder.cumulation中,暂存一下,等待下次接收到数据时继续处理直到达到要求长度之后才交给后续的ChannelHandler来处理,ByteToMessageDecoder代码如下:

// ByteToMessageDecoder
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                // cumulation保存有上次未处理(不完整)的数据
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            // 继续回调channelHandler.channelRead
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        decode(ctx, in, out);
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            handlerRemoved(ctx);
        }
    }
}
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}
// 已接收数据未达到要求的长度时,继续等待接收
protected Object decode(
        @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
        return in.readRetainedSlice(frameLength);
    }
}

处理粘包拆包,其实思路都是一致的,就是“分分合合”,粘包由于数据过多,那就按照固定策略分割下交给程序来处理;拆包由于一次传输数据较少,那就等待数据传输长度够了再交给程序来处理。

本文分享自微信公众号 - TopCoder(gh_12e4a74a5c9c)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 深入分析 Session 和 Cookie

    在Web发展史中,我们知道浏览器与服务器间采用的是 http协议,而这种协议是无状态的,所以这就导致了服务器无法知道是谁在浏览网页,但很明显,一些网页需要知道用...

    用户1516716
  • 【译】设计优雅的 Rust 库 API

    原文见:https://deterministic.space/elegant-apis-in-rust.html

    MikeLoveRust
  • 一种无法被Dump的jar包加密保护解决方案

    由于Java的指令集比较简单而通用,较容易得出程序的语义信息,Java编译后的Jar包和Class文件,可以轻而易举的使用反编译工具(如JD-GUI)进行反编译...

    我是小三
  • 【DB笔试面试690】在Oracle中,什么是分布式事务处理?

    现代数据库系统往往伴随着复杂的结构和环境,其中,分布式数据库组成是一个重要方面。系统后台的数据库系统不再是由单个数据库构成,而是由多台独立数据库、甚至是多台异构...

    小麦苗DBA宝典
  • Apache RocketMQ原理(1)——服务端组件介绍

    获取上云帮助文档:http://rocketmq.cloud/zh-cn/blog/tocloud-catalog.html

    用户5224393
  • 谷歌软件工程师:我为什么喜欢用Go语言?

    Go语言最近几年逐渐获得越来越多的开发者的喜欢。在Go社区前不久刚刚庆祝Go诞生10周年生日之际,谷歌云软件工程师Benjamin Congdon发表个人博客,...

    新智元
  • 灵魂拷问:Java 的 substring() 是如何工作的?

    在逛 programcreek 的时候,我发现了一些小而精悍的主题。比如说:Java 的 substring() 方法是如何工作的?像这类灵魂拷问的主题,非常值...

    沉默王二
  • 聊聊rocketmq的consumeTimeout

    rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQ...

    codecraft
  • Java网络编程与NIO详解11:Tomcat中的Connector源码分析(NIO)

    本文是微信公众号【Java技术江湖】的《不可轻视的Java网络编程》其中一篇,本文部分内容来源于网络,为了把本文主题讲得清晰透彻,也整合了很多我认为不错的技术博...

    Java技术江湖
  • 聊聊rocketmq的adjustThreadPoolNumsThreshold

    本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold

    codecraft

扫码关注云+社区

领取腾讯云代金券