假如客户端给服务端发送数据,那么服务端的Netty从网络中读取的数据都是连续的字节流数据,同时粘包和拆包也在'捣乱',如何读取一个完整的数据包, 这个重担就落在了解码器的身上.
本篇文章介绍下使用广泛的LengthFieldBasedFrameDecoder解码器.在介绍之前, 先看个总览图
简单描述上面这张图, 假如客户端给服务端发送数据.
第一次当数据(HEL)到达服务端之后,Netty中的NioByteUnsafe类会从TCP缓冲区将数据(HEL)读取出来并封装成一个ByteBuf传给帧解码器.
第二次当数据(LO,W)也到达服务端之后,相同的操作,将数据(LO,W)再传给帧解码器.
第三次当数据(ORLD,HELLO)也到达服务端之后,相同的操作,将数据(ORLD,HELLO)再传给帧解码器.
这个时候,帧解码器发现,三次读取的数据拼接在一起是HELLO,WORLD,HELLO.而HELLO,WORLD已经是一个完整的帧数据了(具体何种数据才是一个完整的帧是由使用者自己规定的).
说明一下,三次数据读取之后,帧解码器拿到的数据的确是HELLO,WORLD,HELLO 但是这17个字符表达的含义是什么,帧解码器不知道.它只是知道HELLO,WORLD这11个字符就是一个完整的帧数据,帧解码器需要把这11个字符传给业务解码器,由业务解码器去判断这11个字符的具体含义.
接下来看下,LengthFieldBasedFrameDecoder这个帧解码器如何从毫无含义的一串数据中'截取'出一个帧数据的.
有4个重要的属性,如下
// 偏移量
private final int lengthFieldOffset;
// 长度
private final int lengthFieldLength;
// 调整
private final int lengthAdjustment;
// 跳过
private final int initialBytesToStrip;
上面一长串数据(十六进制数据),假如此时已经读取到了数据F了(即数据F之前的数据已经读完了,包括数据F),这个时候,根据规则需要从C3开始一直读取到05才是我们设定的一个帧的数据.该如何读取呢?这个就需要上面的四个属性支持了.
lengthFieldOffset的含义是偏移量,即距离最后一次读位置的长度.因为此时已经读取到字母F处,当lengthFieldOffset=1,表示偏移1个字节,从F向后数1个字节,到了C3的位置.
lengthFieldLength表示数据字段长度的长度.当lengthFieldLength=2,就是接着上面C3的位置继续向后'走'2个字节.取出这2个字节的内容,内容就是0002.
由于内容0002等于2,也就是说继续向后读取2个字节,但是稍等下,还有一个lengthAdjustment属性,它是调节的功能,假如lengthAdjustment=3,那么2+3=5才是真正需要向后读取的数据.也就是说,最后我们读取的数据是C300020102030405
到目前为止,我们读取到的数据是C300020102030405这些内容.然而我们只想把0102030405传给后面的业务解码器,这个时候就需要使用initialBytesToStrip属性,它是跳过指定字节的意思.initialBytesToStrip=3就是要跳过3个字节,也就是跳过C30002这3个字节.把剩下的0102030405传给业务解码器.
接下来直接通过分析源码的方式, 学习它的解码过程.
// 源码核心方法
io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)
假如此时decode方法是个空方法,我们通过加法的方式讲解源码,一点一点向方法中添加代码.
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength ,构造函数中设置的
// in.readableBytes()表示可读的数据
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
}
如下图所示,由于客户端此时才发送过来C300这2个字节的数据. 而lengthFieldEndOffset=lengthFieldOffset + lengthFieldLength=3个字节(即C30002这三个字节). 因为只有根据长度(即图中的0x0002)才能知道接下来需要继续读取多少的实际内容,可目前已经接收的数据还不够辨识出来长度的数据,只能继续等待客户端发送足够的数据过来.
继续
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
// 由于每读取一个帧数据, readerIndex就会增长一个帧的长度.
// 所以在读取当前帧的时候, 当前帧在整个数据串的偏移量是in.readerIndex() + lengthFieldOffset
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 从实际偏移量的位置(即actualLengthFieldOffset)读取lengthFieldLength长度的数据
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
// 上面两行代码的含义是: 从整个数据串的可读位置向后
// 偏移lengthFieldOffset的长度后再读取lengthFieldLength长度的数据作为frameLength
// 如果小于0则抛异常
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
}
继续
如上图,读取出来的frameLength值可能即表示L1,也可能表示L2的长度,这个是由程序员自己定义的,因为前面我们根据偏移量和长度已经读取到了位置B,我们就是需要再读取长度L1的数据,如果frameLength表示的是L2的长度,那么我们还需要跳过(L2-L1)的长度,文章之前也说过,这里需要使用lengthAdjustment的值进行调整.即frameLength+lengthAdjustment才表示L1的长度.
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// 即 frameLength = frameLength + lengthAdjustment + lengthFieldEndOffset;
frameLength += lengthAdjustment + lengthFieldEndOffset;
// #1
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
}
根据代码和图片分析,如果我们从lengthFieldLength中读取的长度(即图中0002)加上调整长度lengthAdjustment大于0的话,再加上lengthFieldEndOffset一定会大于lengthFieldEndOffset.
既然会出现#1中的frameLength < lengthFieldEndOffset问题, 说明原先的frameLength+lengthAdjustment小于0了, 说明这个数据有问题, 要读取的实际有含义的数据怎么会小于0呢.
继续
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
// 如果整个帧的长度大于我们设置的最大长度
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
}
一旦frameLength大于了maxFrameLength,那么需要无视这个帧,把这些数据直接跳过.
private void exceededFrameLength(ByteBuf in, long frameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
// 说明frameLength < in.readableBytes() 也就是说现在已经读取到的数据
// 足够跳过一个完整的帧长度frameLength
if (discard < 0) {
// 直接跳过这些无效的数据
in.skipBytes((int) frameLength);
} else {
// 说明frameLength > in.readableBytes()
// 也就是说目前已经读取到的数据还不够跳过
discardingTooLongFrame = true;
// 还有discard这么多数据需要跳过,只是目前没有这么多数据了,
// 等到下次读取到数据,再从里面拿出来discard个数据再跳过
bytesToDiscard = discard;
// 那么先把已经读取的数据跳过
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
}
在decode方法一开始的时候,还有一个方法,之前没写,是因为还没讲到这块,它的作用就是把'亏欠'的数据读取出来并跳过.
private void discardingTooLongFrame(ByteBuf in) {
// 亏欠bytesToDiscard个数据
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
// '还钱'操作
in.skipBytes(localBytesToDiscard);
// 记录还了这次,还亏欠多少
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
把最后的内容梳理完
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
int frameLengthInt = (int) frameLength;
// 还不够读取一个帧, 直接返回
if (in.readableBytes() < frameLengthInt) {
return null;
}
// 比如读取帧的长度是100个字节,可是你让我去除105个字节
// 说白了,还是这个帧不合法
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// 正常提取一个帧的数据
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
一图概之