Netty粘包拆包解决方案

前言

本篇文章是Netty专题的第六篇,前面五篇文章如下:

TCP黏包拆包

TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

怎么解决?

  • 消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
  • 在数据包尾部添加特殊分隔符,比如下划线,中划线等
  • 将消息分为消息头和消息体,消息头中包含表示信息的总长度

Netty提供了多个解码器,可以进行分包的操作,分别是:

  • LineBasedFrameDecoder (回车换行分包)
  • DelimiterBasedFrameDecoder(特殊分隔符分包)
  • FixedLengthFrameDecoder(固定长度报文来分包)
  • LengthFieldBasedFrameDecoder(自定义长度来分包)

制造粘包和拆包问题

为了验证我们的解码器能够解决这种粘包和拆包带来的问题,首先我们就制造一个这样的问题,以此用来做对比。

服务端:

public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() { 
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("decoder", new StringDecoder());
                        ch.pipeline().addLast("encoder", new StringEncoder());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                System.err.println("server:" + msg.toString());
                                ctx.writeAndFlush(msg.toString() + "你好" );
                            }
                        });
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            ChannelFuture f = bootstrap.bind(2222).sync();
             f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

客户端我们发送一个比较长的字符串,如果服务端收到的消息是一条,那么就是对的,如果是多条,那么就有问题了。

public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Channel channel = null;
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new StringDecoder());
                    ch.pipeline().addLast("encoder", new StringEncoder());
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            System.err.println("client:" + msg.toString());
                        }
                    });
                }
            });
            ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
            channel = f.channel();
            StringBuilder msg = new StringBuilder();
            for (int i = 0; i < 100; i++) {
                msg.append("hello yinjihuan");
            }
            channel.writeAndFlush(msg);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

首先启动服务端,然后再启动客户端,通过控制台可以看到服务接收的数据分成了2次,这就是我们要解决的问题。

server:hello yinjihuanhello....
server:o yinjihuanhello...

LineBasedFrameDecoder

用LineBasedFrameDecoder 来解决需要在发送的数据结尾加上回车换行符,这样LineBasedFrameDecoder 才知道这段数据有没有读取完整。

改造服务端代码,只需加上LineBasedFrameDecoder 解码器即可,构造函数的参数是数据包的最大长度。

 public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new LineBasedFrameDecoder(10240));
      ch.pipeline().addLast("decoder", new StringDecoder());
      ch.pipeline().addLast("encoder", new StringEncoder());
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                System.err.println("server:" + msg.toString());
                ctx.writeAndFlush(msg.toString() + "你好");
            }
     });
}

改造客户端发送代码,再数据后面加上回车换行符

ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
    msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg + System.getProperty("line.separator"));

DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder和LineBasedFrameDecoder差不多,DelimiterBasedFrameDecoder可以自己定义需要分割的符号,比如下划线,中划线等等。

改造服务端代码,只需加上DelimiterBasedFrameDecoder解码器即可,构造函数的参数是数据包的最大长度。我们用下划线来分割。

 public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes())));
      ch.pipeline().addLast("decoder", new StringDecoder());
      ch.pipeline().addLast("encoder", new StringEncoder());
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                System.err.println("server:" + msg.toString());
                ctx.writeAndFlush(msg.toString() + "你好");
            }
     });
}

改造客户端发送代码,再数据后面加上下划线

ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
    msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg + "_");

FixedLengthFrameDecoder

FixedLengthFrameDecoder是按固定的数据长度来进行解码的,也就是说你客户端发送的每条消息的长度是固定的,下面我们看看怎么使用。 服务端还是一样,增加FixedLengthFrameDecoder解码器即可。

 public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new FixedLengthFrameDecoder(1500));
      ch.pipeline().addLast("decoder", new StringDecoder());
      ch.pipeline().addLast("encoder", new StringEncoder());
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                System.err.println("server:" + msg.toString());
                ctx.writeAndFlush(msg.toString() + "你好");
            }
     });
}

客户端,msg输出的长度就是1500

ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
    msg.append("hello yinjihuan");
}
System.out.println(msg.length());
channel.writeAndFlush(msg);

LengthFieldBasedFrameDecoder

服务端代码:

 public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
      ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
      ch.pipeline().addLast("decoder", new StringDecoder());
      ch.pipeline().addLast("encoder", new StringEncoder());
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                System.err.println("server:" + msg.toString());
                ctx.writeAndFlush(msg.toString() + "你好");
            }
     });
}

客户端,直接发送就行

ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
    msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg);

LengthFieldBasedFrameDecoder的详细使用可以参考这篇文章:http://blog.csdn.net/u010853261/article/details/55803933

源码参考:https://github.com/yinjihuan/netty-im

原文发布于微信公众号 - 猿天地(cxytiandi)

原文发表时间:2018-03-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏xdecode

Java高并发之无锁与Atomic源码分析

2034
来自专栏码匠的流水账

No thread-bound request found异常

本文主要研究下spring mvc的No thread-bound request found异常

2160
来自专栏jeremy的技术点滴

py3_cookbook_notes_02

37812
来自专栏JAVA技术站

JFinal 参数校验插件扩展,让后台参数校验像js一样方式好用

email=^\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*$,"邮箱格式不正确" chinese=^[...

2632
来自专栏wannshan(javaer,RPC)

dubbo通信消息解析过程分析(1)

由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,今天就先从一个点说起。 说说,dubbo通过netty框架做传...

4736
来自专栏小灰灰

EventBus源码学习笔记(二)

EventBus深入学习二 开始研究源码的设计思路,从Listener注册出发,EventBus 如何维护监听者信息,到Publisher发送消息,消息以怎样...

2565
来自专栏搜云库

Java并发基础:了解无锁CAS就从源码分析

CAS的全称为Compare And Swap,直译就是比较交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,...

1474
来自专栏码洞

大厨小鲜——基于Netty自己动手编写RPC框架

今天我们要来做一道小菜,这道菜就是RPC通讯框架。它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架。

1952
来自专栏Java大联盟

23种设计模式详解(六)

1022
来自专栏大内老A

WCF技术剖析之十五:数据契约代理(DataContractSurrogate)在序列化中的作用

如果一个类型,不一定是数据契约,和给定的数据契约具有很大的差异,而我们要将该类型的对象序列化成基于数据契约对应的XML。反之,对于一段给定的基于数据契约的XML...

1927

扫码关注云+社区