作者:莫那鲁道
链接:https://www.jianshu.com/p/4c35541eec10
前言
Netty 的解码器有很多种,比如基于长度的,基于分割符的,私有协议的。但是,总体的思路都是一致的。
拆包思路:当数据满足了 解码条件时,将其拆开。放到数组。然后发送到业务 handler 处理。
半包思路: 当读取的数据不够时,先存起来,直到满足解码条件后,放进数组。送到业务 handler 处理。
而实现这个逻辑的就是我们今天的主角:ByteToMessageDecoder。
看名字的意思是:将字节转换成消息的解码器。人如其名。而它本身也是一个入站 handler,所以,我们还是从他的 channelRead 方法入手。
1. channelRead 方法
精简过的代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 从对象池中取出一个List
CodecOutputList out = CodecOutputList.newInstance();
ByteBuf data = (ByteBuf) msg;
first = cumulation == null; if (first) { // 第一次解码
cumulation = data;// 累计
} else { // 第二次解码,就将 data 向 cumulation 追加,并释放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
} // 得到追加后的 cumulation 后,调用 decode 方法进行解码
// 解码过程中,调用 fireChannelRead 方法,主要目的是将累积区的内容 decode 到 数组中
callDecode(ctx, cumulation, out); // 如果累计区没有可读字节了
if (cumulation != null && !cumulation.isReadable()) { // 将次数归零
numReads = 0; // 释放累计区
cumulation.release(); // 等待 gc
cumulation = null;
} // 如果超过了 16 次,就压缩累计区,主要是将已经读过的数据丢弃,将 readIndex 归零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
} int size = out.size(); // 如果没有向数组插入过任何数据
decodeWasNull = !out.insertSinceRecycled(); // 循环数组,向后面的 handler 发送数据,如果数组是空,那不会调用
fireChannelRead(ctx, out, size); // 将数组中的内容清空,将数组的数组的下标恢复至原来
out.recycle();
}
楼主已经在方法中写了注释,但还是说说主要的步骤:
下面来说说详细的步骤。
2. 从对象池中取出一个空的数组
代码:
@1CodecOutputList out = CodecOutputList.newInstance();
@2static CodecOutputList newInstance() { return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL = new FastThreadLocal<CodecOutputLists>() { @Override
protected CodecOutputLists initialValue() throws Exception { // 16 CodecOutputList per Thread are cached.
return new CodecOutputLists(16);
}
};
@4CodecOutputLists(int numElements) {
elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)]; for (int i = 0; i < elements.length; ++i) { // Size of 16 should be good enough for the majority of all users as an initial capacity.
elements[i] = new CodecOutputList(this, 16);
}
count = elements.length;
currentIdx = elements.length;
mask = elements.length - 1;
}
@5private CodecOutputList(CodecOutputListRecycler recycler, int size) { this.recycler = recycler;
array = new Object[size];
}
@6public CodecOutputList getOrCreate() { if (count == 0) { // Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
// low.
return new CodecOutputList(NOOP_RECYCLER, 4);
}
--count; int idx = (currentIdx - 1) & mask;
CodecOutputList list = elements[idx];
currentIdx = idx; return list;
}
代码分为 1,2,3,4,5, 6 步骤。
由于这个 getOrCreate 方法会被一个线程的多个地方使用,因此 16 是个统计值。当 16 不够的时候,就会创建一个新的 List。也就是 count == 0 的逻辑。而 & mask 的操作就是一个取模的操作。
3. 写入累积区
代码如下:
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
这个 cumulator 默认是个 Cumulator 类型的 MERGE_CUMULATOR,该实例最主要的是从重写了 cumulate 方法:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { final ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release(); return buffer;
}
};
可以看到该方法,主要是将 unsafe.read 传递过来的 ByteBuf 的内容写入到 cumulation 累积区中,然后释放掉旧的内容,由于这个变量是成员变量,因此可以多次调用 channelRead 方法写入。
同时这个方法也考虑到了扩容的问题,总的来说就是 copy。
当然,ByteToMessageDecoder 中还有一个 Cumulator 实例,称之为 COMPOSITE_CUMULATOR,混合累积。由于上个实例的 cumulate 方法是使用内存拷贝的,因此,这里提供了使用混合内存。相较于拷贝,性能会更好点,但同时也会更复杂。
4. decode 方法的作用
当数据追击到累积区之后,需要调用 decode 方法进行解码,代码如下:
@ 1callDecode(ctx, cumulation, out);
@2
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // 如果累计区还有可读字节
while (in.isReadable()) { int outSize = out.size(); // 上次循环成功解码
if (outSize > 0) { // 调用后面的业务 handler 的 ChannelRead 方法
fireChannelRead(ctx, out, outSize); // 将 size 置为0
out.clear();//
if (ctx.isRemoved()) { break;
}
outSize = 0;
} // 得到可读字节数
int oldInputLength = in.readableBytes(); // 调用 decode 方法,将成功解码后的数据放入道 out 数组中,可能会删除当前节点,删除之前会将数据发送到最后的 handler
decodeRemovalReentryProtection(ctx, in, out);// decode()
if (ctx.isRemoved()) { break;
} if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break;
} else { continue;
}
} if (isSingleDecode()) { break;
}
}
}
该方法主要逻辑:只要累积区还有未读数据,就循环进行读取。
所以,这段代码的关键就是子类需要重写 decode 方法,将累积区的数据正确的解码并添加到数组中。
每添加一次成功,就会调用 fireChannelRead 方法,将数组中的数据传递给后面的 handler。完成之后将数组的 size 设置为 0.
所以,如果你的业务 handler 在这个地方可能会被多次调用。也可能一次也不调用。取决于数组中的值。当然,如果解码 handler 被移除了,就会将累积区的所有数据刷到后面的 handler。
5. 剩下的逻辑
上面的逻辑就是解码器最主要的逻辑:
将 read 方法的数据读取到累积区,使用解码器解码累积区的数据,解码成功一个就放入到一个数组中,并将数组中的数据一次次的传递到后面的handler。
从上面的逻辑看,除非 handler 被移除,否则不会调用后面的 handler 方法,也就是说,只要不满足解码器的解码规则,就不会传递给后面的 handler。
再看看后面的逻辑,主要在 finally 块中:
这样就能节省一些内存了,但这会引起一些内存复制的过程,以性能损耗为前提的。
如果是 true,则会判断 autoRead 属性,如果是 false 的话,那么 Netty 认为还有数据没有读到,不然数组为什么一直是空的?就主动调用 read 方法从 Socket 读取。
答:如果是单次解码器,就需要发送了,因此单词解码器是不会再 callDecode 方法中发送的。
最后一行的 recycler.recycle(this),有两种结果,如果是 CodecOutputLists 的 recycle 方法,内容如下:
恢复数组下标,对 count ++,表示有对象可用了。
还有第二种,当 16 个数组不够用了,就需要创建一个新的,在 getOrCreate 方法体现。而构造函数中的 recycler 是一个空对象。我们看看这个对象:
当调用 recycle 方法的时候,什么都不做。等待 GC 回收。因为这不是个对象池的引用。
好,到这里,关于 ByteToMessageDecoder 解码器的主要功能就解读完了。
5. 总结
可以说,ByteToMessageDecoder 是解码器的核心所做,Netty 在这里使用了模板模式,留给子类扩展的方法就是 decode 方法。
主要逻辑就是将所有的数据全部放入累积区,子类从累积区取出数据进行解码后放入到一个 数组中,ByteToMessageDecoder 会循环数组调用后面的 handler 方法,将数据一帧帧的发送到业务 handler 。完成这个的解码逻辑。
使用这种方式,无论是粘包还是拆包,都可以完美的实现。
还有一些小细节:
Netty 所有的解码器,都可以在此类上扩展,一切取决于 decode 的实现。只要遵守 ByteToMessageDecoder 的约定即可。
END