前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty与TCP粘包拆包

Netty与TCP粘包拆包

作者头像
黑洞代码
发布2021-01-14 15:37:59
9430
发布2021-01-14 15:37:59
举报

Netty如何解决TCP粘包拆包的问题?

TCP粘包/拆包

TCP协议是个流协议,所谓流,就是指没有界限的一串数据。河里的流水,是连成一片的,没有分界线。TCP底层并不了解上层业务数据的具体意义,他会根据TCP缓冲区的实际情况进行包的划分,所以在业务上一个完整的包,有可能会被TCP拆分为多个包进行发送,也有可能把业务上多个小包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

粘包拆包说明

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象

第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

如果此时服务器端TCP接收窗口非常小,而数据包Packet1和Packet2比较大,很有可能会发生另一种情况——服务器分多次才能将Packet1和Packet2完全接收,期间会发生多次拆包。

粘包、拆包发生原因

1.要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包即应用程序写入数据的字节大小大于套接字发送缓冲区的大小。

2.进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度,待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

3.要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

4.接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

5.以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成若干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

TCP粘包拆包的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据不被拆包和重组的,这样问题需要通过上层的应用协议栈设计来解决。

1. 消息定长。例如100字节。如果不够,空位补空格。

2. 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议,发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

3. 将消息分为消息头和消息体。消息头中包含消息总长度的字段,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

4. 其它复杂的协议,如RTMP协议等。

未考虑TCP粘包拆包导致异常的案例

服务器端代码:

代码语言:javascript
复制
public class TimeServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new TimeServer().bind(port);
    }
}

服务器端处理类

代码语言:javascript
复制
public class TimeServerHandler extends ChannelHandlerAdapter {
    /**
     * 计数器
     */
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //类似NIO中的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        //获取缓冲区可读字节数
        byte[] req = new byte[buf.readableBytes()];
        //缓冲区中的字节复制到字节数组
        buf.readBytes(req);
        String body = new String(req).substring(0, req.length - System.getProperty("line.separator").length());
        if ("hello world".equalsIgnoreCase(body)) {
            System.out.println("收到输入:" + body);
        } else {
            System.out.println("异常输入:" + body);
        }
        ++counter;
        ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date()).getBytes());
        //并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
        ctx.write(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}

客户端代码

代码语言:javascript
复制
public class TimeClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new TimeClient().connect(port, "127.0.0.1");
    }
}

客服端处理类

代码语言:javascript
复制
public class TimeClientHandler extends ChannelHandlerAdapter {

    private byte[] req;

    public TimeClientHandler() {
        req = ("hello world" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] resp = new byte[buf.readableBytes()];
        buf.readBytes(resp);
        String body = new String(resp);
        System.out.println(body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

输出结果:

代码语言:javascript
复制
收到输入:hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hell
收到输入:o world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world

由于没有考虑TCP粘包拆包的问题,所以发生“收到输入:o world”的输出。

Netty提供的解决方案

LineBasedFrameDecoder和StringDecoder解码器的使用

优化后的服务器端代码

代码语言:javascript
复制
public class OptimizedTimeServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //LineBasedFrameDecoder解码器
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //StringDecoder解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new OptimizeTimeServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new OptimizedTimeServer().bind(port);
    }
}

优化后的服务器端处理类

代码语言:javascript
复制
public class OptimizeTimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String body = (String) msg;
        System.out.println("收到消息:" + body);
        ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + System.getProperty("line.separator")).getBytes());
        //并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}

优化后的客户端代码

代码语言:javascript
复制
public class OptimizeTimeClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new OptimizeTimeClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new OptimizeTimeClient().connect(port, "127.0.0.1");
    }
}

优化后的客户端处理类

代码语言:javascript
复制
public class OptimizeTimeClientHandler extends ChannelHandlerAdapter {

    private byte[] req;

    public OptimizeTimeClientHandler() {
        req = ("hello world" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println(body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

优化后的服务器端输出

代码语言:javascript
复制
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world

LineBasedFrameDecoder和StringDecoder解码器原理分析

LineBasedFrameDecoder:遍历ByteBuf中的可读字节,判断是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置之间的字节组成一行,它是以换行为结束标识符的解码器,支持携带结束符或不携带结束符两种方式解码,同时配置支持单行最大长度,如果连续读取最大长度后,仍没有发现换行符就会抛出异常,同时忽略掉之前读取到的异常码流。

StringDecoder:将收到的对象转换成字符串,然后调用后续的Handler。

LineBasedFrameDecoder + StringDecoder = 按行切换的文本解码器

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TCP粘包/拆包
  • 粘包拆包说明
  • 粘包、拆包发生原因
  • TCP粘包拆包的解决策略
  • Netty提供的解决方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档