Netty之粘包分包

粘包现象

客户端在一个for循环内连续发送1000个hello给Netty服务器端,  

1         Socket socket = new Socket("127.0.0.1", 10101);
2         for(int i = 0; i < 1000; i++){
3             socket.getOutputStream().write(“hello”.getBytes());
4         }        
5         socket.close();

 而在服务器端接受到的信息并不是预期的1000个独立的Hello字符串.

实际上是无序的hello字符串混合在一起, 如图所示. 这种现象我们称之为粘包.

为什么会出现这种现象呢? TCP是个”流”协议,流其实就是没有界限的一串数据。 

TCP底层中并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包划分,

所以在TCP中就有可能一个完整地包会被TCP拆分成多个包,也有可能吧多个小的包封装成一个大的数据包发送。

分包处理

顾名思义, 我们要对传输的数据进行分包. 一个简单的处理逻辑是在发送数据包之前, 先用四个字节占位, 表示数据包的长度. 

数据包结构为:

|    长度(4字节)    |    数据    |

 1         Socket socket = new Socket("127.0.0.1", 10101);
 2         String message = "hello";
 3         byte[] bytes = message.getBytes();
 4         ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
 5         // 消息长度
 6         buffer.putInt(bytes.length);
 7         // 消息正文
 8         buffer.put(bytes);
 9         byte[] array = buffer.array();
10         for(int i = 0; i < 1000; i++){
11             socket.getOutputStream().write(array);
12         }
13         socket.close();

服务器端代码, 我们需要借助于FrameDecoder类来分包.

 1 public class MyDecoder extends FrameDecoder {
 2 
 3     @Override
 4     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
 5 
 6         if(buffer.readableBytes() > 4){            
 7             //标记
 8             buffer.markReaderIndex();
 9             //长度
10             int length = buffer.readInt();
11             
12             if(buffer.readableBytes() < length){
13                 buffer.resetReaderIndex();
14                 //缓存当前剩余的buffer数据,等待剩下数据包到来
15                 return null;
16             }
17             
18             //读数据
19             byte[] bytes = new byte[length];
20             buffer.readBytes(bytes);
21             //往下传递对象
22             return new String(bytes);
23         }
24         //缓存当前剩余的buffer数据,等待剩下数据包到来
25         return null;
26     }
27 
28 }

如此一来, 我们再次在服务器端接受到的消息就是按序打印的hello了.

这边可能有个疑问, 为什么MyDecoder中数据没有读取完毕, 需要return null,

正常的pipeline在数据处理完都是要sendUpstream, 给下一个pipeline的.

这个需要看下FrameDecoder.messageReceived 的源码. 他在其中缓存了一个cumulation对象, 

如果return了null, 他会继续往缓存里写数据来实现分包

 1     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
 2         Object m = e.getMessage();
 3         if (!(m instanceof ChannelBuffer)) {
 4           // 数据读完了, 转下一个pipeline
 5             ctx.sendUpstream(e);
 6         } else {
 7             ChannelBuffer input = (ChannelBuffer)m;
 8             if (input.readable()) {
 9                 if (this.cumulation == null) {
10                     try {
11                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
12                     } finally {
13                         this.updateCumulation(ctx, input);
14                     }
15                 } else {
16                  // 缓存上一次没读完整的数据
17                     input = this.appendToCumulation(input);
18 
19                     try {
20                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
21                     } finally {
22                         this.updateCumulation(ctx, input);
23                     }
24                 }
25 
26             }
27         }
28     }

那么是不是这样就万事大吉了呢?

Socket字节流攻击

在上述代码中, 我们会在服务器端为客户端发送的数据包长度, 预先分配byte数组.

如果遇到恶意攻击, 传入的数据长度与内容 不匹配. 例如声明数据长度为Integer.MAX_VALUE.

这样会消耗大量的服务器资源生成byte[], 显然是不合理的.

因此我们还要加个最大长度限制.

1           if(buffer.readableBytes() > 2048){
2              buffer.skipBytes(buffer.readableBytes());
3           }

新的麻烦也随之而来, 虽然可以跳过指定长度, 但是数据包本身就乱掉了.

因为长度和内容不匹配, 跳过一个长度后, 不知道下一段数据的开头在哪里了.

因此我们自定义数据包里面, 不仅要引入数据包长度, 还要引入一个包头来划分各个包的范围.

包头用任意一段特殊字符标记即可, 例如$$$.

 1         // 防止socket字节流攻击
 2         if(buffer.readableBytes() > 2048){
 3           buffer.skipBytes(buffer.readableBytes());
 4         }
 5         // 记录包头开始的index
 6         int beginReader = buffer.readerIndex();
 7         
 8         while(true) {
 9             if(buffer.readInt() == ConstantValue.FLAG) {
10                 break;
11             }
12         }

新的数据包结构为:

|    包头(4字节)    |    长度(4字节)    |    数据    |

Netty自带拆包类

自己实现拆包虽然可以细粒度控制, 但是也会有些不方便, 可以直接调用Netty提供的一些内置拆包类.

  • FixedLengthFrameDecoder 按照特定长度组包
  • DelimiterBasedFrameDecoder 按照指定分隔符组包, 例如本文中的$$$
  • LineBasedFrameDecoder 按照换行符进行组包, \r \n等等
  • ......

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

spring security运行时配置ignore url

以前用shiro的比较多,不过spring boot倒是挺推崇自家的spring security的,有默认的starter,于是也就拿来用了。

691
来自专栏Java架构师学习

带你深入了解Java线程中的那些事

引言 说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢? public class Thre...

3208
来自专栏一个番茄说

让你在WebView中用JS调Native Object

之所做这个东西,源于之前项目中需要把一些页面用webView来呈现,但是web中需要调用native的方法,比如获取本地存的某些数据、调用摄像头等等,这里也就是...

1003
来自专栏小灰灰

SpringMVC返回图片的几种方式

主要借助的是 HttpServletResponse这个对象,实现case如下

2457
来自专栏GreenLeaves

C# 文件读写系列二

读取文件原则上非常简单,但它不是通过FileInfo和DirectoryInfo来完成的,关于FileInfo和DirectoryInfo请参考C# 文件操作系...

2959
来自专栏Java 技术分享

JavaWeb 之文件的上传下载

4525
来自专栏zhisheng

JAVA虚拟机关闭钩子(Shutdown Hook)

当你认真的去看一个组件的源码的时候,你会经常看见这种关闭钩子的函数,如果你不了解的话,谷歌一下,你就会发现如下文章就是搜索引擎出来的第一篇,不愧是出自我们优秀的...

2073
来自专栏安恒网络空间安全讲武堂

PWNCTF部分复现

根据readData和writedata函数的逻辑发现数组是char [22][12],主要是判断越界的if语句有逻辑漏洞

1352
来自专栏Elasticsearch实验室

Elasitcsearch 底层系列 Lucene 内核解析之 Doc Value

       Elasticsearch 支持行存和列存,行存用于以文档为单位顺序存储多个文档的原始内容,在 Elasitcsearch 底层系列 Lucene...

2764
来自专栏SDNLAB

Open vSwitch系列之openflow版本兼容

众所周知Open vSwitch支持的openflow版本从1.0到1.5版本(当前Open vSwitch版本是2.3.2)通过阅读代码,处理openflow...

54213

扫码关注云+社区