Netty 源码深度解析(八) - 解码

就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。

当然这也适应于本文的主题:编码和解码,或者数据从一种特定协议的格式到另一种格式的转 换。这些任务将由通常称为编解码器的组件来处理

Netty 提供了多种组件,简化了为了支持广泛 的协议而创建自定义的编解码器的过程

例如,如果你正在构建一个基于 Netty 的邮件服务器,那 么你将会发现 Netty 对于编解码器的支持对于实现 POP3、IMAP 和 SMTP 协议来说是多么的宝贵

0 什么是编解码器

每个网络应用程序都必须定义

  • 如何解析在两个节点之间来回传输的原始字节
  • 如何将其和目标应用程序的数据格式做相互转换

这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式

那么它们的区别是什么呢?

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列— 它的数据。那 么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。

记住这些背景信息,接下来让我们研究一下 Netty 所提供的用于实现这两种组件的类。

1 Netty解码概述

1.1 两个问题

在这一节中,我们将研究 Netty 所提供的解码器类,这些类覆盖了两个不同的用例

  • 将字节解码为消息——ByteToMessageDecoder 和 ReplayingDecoder
  • 将一种消息类型解码为另一种——MessageToMessageDecoder

因为解码器是负责将入站数据从一种格式转换到另一种格式,所以知道 Netty 的解码器实

现了 ChannelInboundHandler 也不会让你感到意外

什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 Channel- InboundHandler 转换入站数据时会用到

此外,得益于ChannelPipeline 的设计,可以将多个解码器连接在一起,以实现任意复杂的转换逻辑,这也是 Netty 是如何支持代码的模块化以及复用的一个很好的例子

2 抽象解码器ByteToMessageDecoder

2.1 示例

将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 特地为它提供了一个抽象的基类:ByteToMessageDecoder

由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理

ByteToMessageDecoderAPI

假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理

在这种情况下,你需要从入站ByteBuf中读取每个 int,并将它传递给ChannelPipeline 中的下一个 ChannelInboundHandler

为了解码这个字节流,你要扩展 ByteToMessageDecoder类(原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)

ToIntegerDecoder

每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int,然后将它添加到一个 List 中

当没有更多的元素可以被添加到该 List 中时,它的内容将会被发送给下一个 Channel- InboundHandler

ToIntegerDecoder类扩展了ByteToMessageDecoder

虽然ByteToMessageDecoder可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐

在下一节中, 我们将讨论 ReplayingDecoder,它是一个特殊的解码器,以少量的开销消除了这个步骤

2.2 源码解析

解码步骤

2.2.1 累加字节流

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //基于 ByteBuf 进行解码的,如果不是直接将当前对象向下传播
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                //若当前累加器为空,说明是第一次从 IO 流中读取数据
                first = cumulation == null;
                if (first) {
                    //第一次会将累加器赋值为刚读进来的 ByteBuf 对象数据
                    cumulation = data;
                } else {
                    //非第一次,则将当前累加器中的数据和读取进来的数据进行累加
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //调用子类的解码方法去解析
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

其中的cumulator

看一下这个MERGE_CUMULATOR

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            //当前的写指针后移一定字节,若超过最大容量,则进行扩容
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain().
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            //将当前数据写到累加器中
            buffer.writeBytes(in);
            //将读进的数据对象释放
            in.release();
            return buffer;
        }
    };

2.2.2 调用子类的 decode方法进行解析

进入该方法查看源码

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            // 只要累加器有数据,循环就会继续执行下去
            while (in.isReadable()) {
                int outSize = out.size();
                // 判断当前list 里是否已经有对象(首次执行时,肯定是不会运行此段代码的)
                if (outSize > 0) {
                    // 有,则通过事件传播机制向下传播
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                // 记录当前可读数据长度
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                //说明什么对象都没解析出来
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                //说明没有从当前累加器中读取数据
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

2.2.2 将解析到的 ByteBuf 向下传播

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
                // 记录当前 list 的长度
                int size = out.size();
                // 将解析到的一个对象向下进行传播
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

编解码器中的引用计数

对于编码器和解码器来说:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放

如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message)这将会增加该引用计数,从而防止该消息被释放

3 基于固定长度解码器分析

/**
 * A decoder that splits the received {@link ByteBuf}s by the fixed number
 * of bytes. For example, if you received the following four fragmented packets:
 * <pre>
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
 * </pre>
 * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
 * following three packets with the fixed length:
 * <pre>
 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+
 * </pre>
 */
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;

    /**
     * Creates a new instance.
     *
     * @param frameLength the length of the frame
     */
    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    /**
     * Create a frame out of the {@link ByteBuf} and return it.
     *
     * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param   in              the {@link ByteBuf} from which to read data
     * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
     *                          be created.
     */
    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //判断当前累加器里的字节是否小于frameLength
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readRetainedSlice(frameLength);
        }
    }
}

4 行解码器分析

非丢弃模式处理

4.1 定位行尾

4.2 非丢弃模式

4.2.1 找到换行符情况下

4.2.2 找不到换行符情况下

解析出长度超过最大可解析长度

直接进入丢弃模式,读指针移到写指针位(即丢弃)

并传播异常

4.3 丢弃模式

找到换行符

记录当前丢弃了多少字节(已丢弃 + 本次将丢弃的)

锁定换行符类型

将读指针直接移到换行符后

丢弃字节置零

重置为非丢弃状态

所有字节丢弃后才触发快速失败机制

找不到换行符

直接记录当前丢弃字节(已丢弃 + 当前可读字节数)

将读指针直接移到写指针

5 基于分隔符解码器分析

  • 构造器 传入一系列分隔符,通过解码器将二进制流分成完整数据包
  • decode 方法

5.1 分析解码步骤

5.1.1 行处理器

  • 行处理器决断
  • 定义位置
  • 初始化位置
  • 判断分隔符

5.1.2 找到最小分隔符

遍历所有分隔符,计算以每一个分隔符分割的数据包的长度

5.1.3 解码

5.1.3.1 找到分隔符

非空,说明已经找到分隔符

和之前一样,在此先判断当前是否处于丢弃模式

非丢弃模式

显然第一次时为 false, 因此非丢弃模式

当前数据包大于允许解析最大数据长度时,直接将该段数据包连同最小分隔符跳过(丢弃)

没有超过的就是正常合理逻辑的数据包的长度,判断解析出的数据包是否包含分隔符

丢弃模式

5.1.3.2 未找到分隔符

5.1.3.2.1 非丢弃模式

当前可读字节长大于允许解析最大数据长度时,记录该丢弃字节数

5.1.3.2.2 丢弃模式

6 基于长度域解码器参数分析

重要参数

  • maxFrameLength (包的最大长度)

防止太大导致内存溢出,超出包的最大长度 Netty 将会做一些特殊处理

  • lengthFieldOffset (消息体长度)

长度域的偏移量lengthFieldOffset,0表示无偏移

ByteBuf的什么位置开始就是length字段

  • lengthFieldLength

长度域length字段的长度

  • lengthAdjustment

有些情况可能会把header也包含到length长度中,或者length字段后面还有一些不包括在length长度内的,可以通过lengthAdjustment调节

  • initialBytesToStrip

起始截掉的部分,如果传递给后面的Handler的数据不需要消息头了,可以通过这个设置

可以通过消息中的一个表示消息长度的字段值动态分割收到的ByteBuf

6.1 基于长度

这类数据包协议比较常见,前几个字节表示数据包长度(不包括长度域),后面为具体数据

拆完后数据包是一个完整的带有长度域的数据包(之后即可传递到应用层解码器进行解码),

创建一个如下方式的LengthFieldBasedFrameDecoder即可实现这类协议

6.2 基于长度截断

若应用层解码器不需用到长度字段,那么我们希望 Netty 拆包后,如此

长度域被截掉,我们只需指定另一个参数 initialBytesToStrip 即可实现

表 Netty 拿到一个完整数据包后向业务解码器传递之前,应该跳过多少字节

initialBytesToStrip 为4,表获取一个完整数据包后,忽略前面4个字节,应用解码器拿到的就是不带长度域的数据包

6.3 基于偏移长度

此方式二进制协议更为普遍,前几个固定字节表示协议头,通常包含一些magicNumberprotocol version 之类的meta信息,紧跟着后面的是一个长度域,表示包体有多少字节的数据

只需要基于第一种情况,调整第二个参数既可以实现

lengthFieldOffset为4,表示跳过4个字节才是长度域

6.4 基于可调整长度的拆包

有些时候,二进制协议可能会设计成如下方式

长度域在前,header在后

  • 长度域在数据包最前面表示无偏移,lengthFieldOffset为 0
  • 长度域的长度为3,即lengthFieldLength为3
  • 长度域表示的包体的长度略过了header,这里有另外一个参数lengthAdjustment,包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包,这里是 12+2,header和包体一共占14字节

6.5 基于偏移可调整长度的截断

二进制协议带有两个header

拆完后,HDR1 丢弃,长度域丢弃,只剩下第二个header和有效包体

这种协议中,一般HDR1可以表示magicNumber,表示应用只接受以该magicNumber开头的二进制数据,RPC 里面用的较多

参数设置

  • 长度域偏移为1,即lengthFieldOffset为1
  • 长度域长度为2,即 lengthFieldLength为2
  • 长度域表示的包体的长度略过HDR2,但拆包时HDR2也被 Netty 当作包体的一部分来拆,HDR2的长度为1,即 lengthAdjustment 为1
  • 拆完后,截掉前面三个字节,即initialBytesToStrip 为 3

6.6 基于偏移可调整变异长度的截断

前面所有的长度域表示的都是不带header的包体的长度

如果让长度域表示的含义包含整个数据包的长度,如下

长度域字段值为16, 其字段长度为2,HDR1的长度为1,HDR2的长度为1,包体的长度为12,1+1+2+12=16

参数设置

除长度域表示的含义和上一种情况不一样外,其他都相同,因为 Netty 不了解业务情况,需告诉 Netty ,长度域后再跟多少字节就可形成一个完整数据包,这里显然是13字节,长度域为16,因此减掉3才是真是的拆包所需要的长度,lengthAdjustment为-3

若你的协议基于长度,即可考虑不用字节来实现,而是直接拿来用,或者继承他,简单修改即可

7 基于长度域解码器分析

7.1 构造方法

public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
    // 省略参数校验
    this.byteOrder = byteOrder;
    this.maxFrameLength = maxFrameLength;
    this.lengthFieldOffset = lengthFieldOffset;
    this.lengthFieldLength = lengthFieldLength;
    this.lengthAdjustment = lengthAdjustment;
    lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
    this.initialBytesToStrip = initialBytesToStrip;
    this.failFast = failFast;
}

把传参数保存在 field即可

  • byteOrder 字节流表示的数据是大端还是小端,用于长度域的读取
  • lengthFieldEndOffset 紧跟长度域字段后面的第一个字节的在整个数据包中的偏移量
  • failFast
    • 为true 表读取到长度域,TA的值的超过maxFrameLength,就抛 TooLongFrameException
    • false 表只有当真正读取完长度域的值表示的字节之后,才抛 TooLongFrameException,默认设为true,建议不要修改,否则可能会造成内存溢出

7.2 实现拆包抽象

具体的拆包协议只需要实现

void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 

in 表目前为止还未拆的数据,拆完之后的包添加到 out这个list中即可实现包向下传递

  • 第一层实现

重载的protected方法decode实现真正的拆包,以下三步走

1 计算需要抽取的数据包的长度

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 拿到实际的未调整过的包长度
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            exceededFrameLength(in, frameLength);
            return null;
        }
    }
  • 拿到长度域的实际字节偏移
  • 调整包的长度
  • 如果当前可读字节还未达到长度长度域的偏移,那说明肯定是读不到长度域的,直接不读

上面有个getUnadjustedFrameLength,若你的长度域代表的值表达的含义不是基本的int,short等基本类型,可重写该方法

比如,有的奇葩的长度域里面虽然是4个字节,比如 0x1234,但是TA的含义是10进制,即长度就是十进制的1234,那么覆盖这个函数即可实现奇葩长度域拆包

  1. 长度校验
  2. 整个数据包的长度还没有长度域长,直接抛异常
  • 数据包长度超出最大包长度,进入丢弃模式
- 当前可读字节已达到`frameLength`,直接跳过`frameLength`个字节,丢弃之后,后面有可能就是一个合法的数据包
- 当前可读字节未达到`frameLength`,说明后面未读到的字节也需丢弃,进入丢弃模式,先把当前累积的字节全部丢弃

bytesToDiscard 表还需丢弃多少字节

  • 最后,调用failIfNecessary判断是否需要抛出异常
    • 不需要再丢弃后面的未读字节(bytesToDiscard == 0),重置丢弃状态
      • 如果没有设置快速失败(!failFast),或者设置了快速失败并且是第一次检测到大包错误(firstDetectionOfTooLongFrame),抛出异常,让handler处理
      • 如果设置了快速失败,并且是第一次检测到打包错误,抛出异常,让handler去处理

前面我们可以知道failFast默认为true,而这里firstDetectionOfTooLongFrametrue,所以,第一次检测到大包肯定会抛出异常

3 丢弃模式的处理

LengthFieldBasedFrameDecoder.decoder方法入口处还有一段代码

  • 若当前处在丢弃模式,先计算需要丢弃多少字节,取当前还需可丢弃字节和可读字节的最小值,丢弃后,进入 failIfNecessary,对照着这个函数看,默认情况下是不会继续抛出异常,而如果设置了 failFast为false,那么等丢弃完之后,才会抛出异常

2 跳过指定字节长度的逻辑处理

在丢弃模式的处理及长度校验都通过后

  • 先验证当前是否已读到足够的字节,若读到了,在下一步抽取一个完整的数据包之前,需根据initialBytesToStrip的设置来跳过某些字节,当然,跳过的字节不能大于数据包的长度,否则抛 CorruptedFrameException 异常

抽取frame

  • 拿到当前累积数据的读指针,然后拿到待抽取数据包的实际长度进行抽取,抽取之后,移动读指针
  • 抽取的过程即调用了一下 ByteBufretainedSlice API,该API无内存copy的开销

从真正抽取数据包来看看,传入的参数为 int 型,所以自定义协议中,如果你的长度域是8字节,那么前4字节基本没用

小结

  • 如果你使用了Netty,并且二进制协议基于长度,考虑使用LengthFieldBasedFrameDecoder吧,通过调整各种参数,一定会满足你
  • LengthFieldBasedFrameDecoder的拆包包括合法参数校验,异常包处理,以及最后调用 ByteBuf 的retainedSlice来实现无内存copy的拆包

8 解码器总结

8.1 ByteToMessageDecoder 解码步骤

8.2 基于长度解码器步骤

8.3 两个问题

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏岑玉海

Hbase 学习(三)Coprocessors

Coprocessors 之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服...

41911
来自专栏日常分享

Java 循环队列的实现

  队列(Queue)是限定只能在一端插入、另一端删除的线性表。允许删除的一端叫做队头(front),允许插入的一端叫做队尾(rear),没有元素的队列称为“空...

2343
来自专栏跟着阿笨一起玩NET

开源实体映射框架EmitMapper介绍

EmitMapper是一个开源实体映射框架,地址:http://emitmapper.codeplex.com/。

1422
来自专栏黑泽君的专栏

day19_java基础加强_动态代理+注解+类加载器

        Proxy Pattern(即:代理模式),23种常用的面向对象软件的设计模式之一。         代理模式的定义:为其他对象提供一种代理以控...

1264
来自专栏difcareer的技术笔记

JNI实现源码分析【四 函数调用】正文0x01:dvmCallMethodV0x02:nativeFunc0x03: 何时赋值

有了前面的铺垫,终于可以说说虚拟机是如何调用JNI方法的了。JNI方法,对应Java中的native方法,所以我们跟踪对Native方法的处理即可。

994
来自专栏FD的专栏

写出形似QML的C++代码

我的第一个想法(居然?)是做个Embedded-DSL。不过C++又不是Ruby……随便搜了一下,发现了一篇文章,也只是利用了重载运算符和运算符优先级,看上去限...

682
来自专栏君赏技术博客

Object-C中的黑魔法

在Swift中存在Option类型,也就是使用?和!声明的变量。但是OC里面没有这个特征,因为在XCODE6.3之后出现新的关键词定义用于OC转SWIFT时候可...

1871
来自专栏偏前端工程师的驿站

(cljs/run-at (JSVM. :all) "细说函数")

1161
来自专栏Java编程技术

ClassLoader解惑

一个Java程序要想运行起来,首先需要经过编译生成 .class文件,然后创建一个运行环境(jvm)来加载字节码文件到内存运行,而.class 文件是怎样被加载...

1481
来自专栏静晴轩

lua表排序

Lua作为一种很强大且轻量级脚本语言的存在,对于掌握其几乎无所不能的Table(其实就是一个Key Value的数据结构,它很像Javascript中的Obje...

47811

扫码关注云+社区

领取腾讯云代金券