前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty中数据包的拆分粘包处理方案,以及对protobuf协议中的拆包粘包方案自定义重写

Netty中数据包的拆分粘包处理方案,以及对protobuf协议中的拆包粘包方案自定义重写

作者头像
小勇DW3
发布2020-04-26 14:04:19
1.5K0
发布2020-04-26 14:04:19
举报
文章被收录于专栏:小勇DW3小勇DW3小勇DW3

1、netty中的拆分粘包处理方案

TCP粘包和拆包

TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

img
img

如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

TCP粘包和拆包产生的原因

数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

img
img

详细来说,造成粘包和拆包的原因主要有以下三个:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
  2. 进行MSS大小的TCP分段
  3. 以太网帧的payload大于MTU进行IP分片。
img
img

粘包和拆包的解决方法

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

  1. 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  2. 将回车换行符作为消息结束符
  3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  4. 通过在消息头中定义长度字段来标识消息的总长度

因为前3个在实际中用的非常少,所以这里主要对4进行说明。

使用LengthFieldBasedFrameDecoder作为decoder实现,LengthFieldBasedFrameDecoder构造函数,第一个参数为信息最大长度,超过这个长度回报异常,第二参数为长度属性的起始(偏移)位,我们的协议中长度是0到第3个字节,所以这里写0,第三个参数为“长度属性”的长度,我们是4个字节,所以写4,第四个参数为长度调节值,在总长被定义为包含包头长度时,修正信息长度,第五个参数为跳过的字节数,根据需要我们跳过前4个字节,以便接收端直接接受到不含“长度属性”的内容。

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
            // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到User对象
            ch.pipeline().addLast(new JsonDecoder());
            // 对响应数据进行编码,主要是将User对象序列化为json
            ch.pipeline().addLast(new JsonEncoder());
            // 处理客户端的请求的数据,并且进行响应
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}
这里EchoServer主要是在pipeline中添加了两个编码器和两个解码一器,编码器主要是负责将响应的User对象序列化为json对象,然后在其字节数组前面添加一个长度字段的字节数组;解码一器主要是对接收到的数据进行长度字段的解码,然后将其反序列化为一个User对象

2、Protobuf协议传输中对粘包和拆包自定义处理

之所以进行自定义处理是因为项目中的客户端不是使用netty来写的,使用基于c++的原生socket实现,所以为了和客户端一致,对

protobuf协议进行了修改:

   private static void nettyProcessService(final Properties prop, final KafkaStringProducerService kafkaProducerService1,
                                            final KafkaStringProducerService kafkaProducerService2, final KafkaStringProducerService kafkaProducerService3,
                                            final ExecutorService executor1, final ExecutorService executor2, final ConcurrentHashMap<String, Channel> mapChannels, final RedisPool redisPool)
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        try {
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 半包处理
                            //socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            socketChannel.pipeline().addLast(new ProtobufFixed32FrameDecoderRedefine());
                            socketChannel.pipeline().addLast(new ProtobufDecoder(protobuf.MsgProto.MsgProtoInfo.getDefaultInstance()));
                            //socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            socketChannel.pipeline().addLast(new ProtobufFixed32LengthFieldPrependerRedefine());
                            socketChannel.pipeline().addLast(new ProtobufEncoder());
                            socketChannel.pipeline().addLast(new SamplingReqServerHandler(prop, kafkaProducerService1, kafkaProducerService2,
                                    kafkaProducerService3, executor1, executor2, mapChannels, redisPool));
                        }
                    });

            ChannelFuture future = b.bind(Integer.parseInt(prop.getProperty("PORT"))).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            logger.info("**************** Netty Serve 已关闭 ****************");
        }
    }

这里主要说明对ProtobufFixed32FrameDecoder进行复写,修改其编解码函数。

ProtobufFixed32FrameDecoderRedefine

public class ProtobufFixed32FrameDecoderRedefine extends ByteToMessageDecoder {

    public ProtobufFixed32FrameDecoderRedefine()
    {
    }

    @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
    {
            in.markReaderIndex();
            int preIndex = in.readerIndex();
            in.markReaderIndex();
            byte[] frontBytes = new byte[4];
            if (in.readableBytes() < 4){
                throw new CorruptedFrameException("less min length[4]: " + in.readableBytes());
            }
            in.readBytes(frontBytes);  //读取前4个字节
            int length = bytesToInt(frontBytes); //自定义字节序获取前四个字节表示的长度
            if (preIndex != in.readerIndex()) {
                if (length < 0) {
                    throw new CorruptedFrameException("negative length: " + length);
                } else {
                    if (in.readableBytes() < length) {
                        in.resetReaderIndex();
                    } else {
                        out.add(in.readRetainedSlice(length));  //读取相应长度的数据
                    }

                }
            }
        }

    public static int bytesToInt(byte b[]) {
        return  b[3] & 0xff
                | (b[2] & 0xff) << 8
                | (b[1] & 0xff) << 16
                | (b[0] & 0xff) << 24;
    }
}
ProtobufFixed32LengthFieldPrependerRedefine复写改动:
继承MessageToByteEncoder方案
public class ProtobufFixed32LengthFieldPrependerRedefine extends MessageToByteEncoder<ByteBuf> {

    public ProtobufFixed32LengthFieldPrependerRedefine() {
    }

    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = 4;
        out.ensureWritable(headerLen + bodyLen);  //前4个字节+数据长度
        writeRawVarint32(out, bodyLen);  //把body的长度写到前四个字节,int转为网络需
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

    static void writeRawVarint32(ByteBuf out, int value) {

        byte[] frontBytes = intToBytes(value);  //int转为网络序

        out.writeBytes(frontBytes);
    }

    //写入的时候,把 int 转化为网络序
    public static byte[] intToBytes(int n) {
        byte[] b = new byte[4];
        b[3] = (byte) (n & 0xff);
        b[2] = (byte) (n >> 8 & 0xff);
        b[1] = (byte) (n >> 16 & 0xff);
        b[0] = (byte) (n >> 24 & 0xff);
        return b;
    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、netty中的拆分粘包处理方案
    • TCP粘包和拆包
      • TCP粘包和拆包产生的原因
        • 粘包和拆包的解决方法
        • 2、Protobuf协议传输中对粘包和拆包自定义处理
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档