Netty源码阅读入门实战(八)-解码(更新 ing)

就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。 当然这也适应于本文的主题:编码和解码,或者数据从一种特定协议的格式到另一种格式的转 换。这些任务将由通常称为编解码器的组件来处理 Netty 提供了多种组件,简化了为了支持广泛 的协议而创建自定义的编解码器的过程 例如,如果你正在构建一个基于 Netty 的邮件服务器,那 么你将会发现 Netty 对于编解码器的支持对于实现 POP3、IMAP 和 SMTP 协议来说是多么的宝贵

0 什么是编解码器

每个网络应用程序都必须定义

  • 如何解析在两个节点之间来回传输的原始字节
  • 如何将其和目标应用程序的数据格式做相互转换

这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式

那么它们的区别是什么呢? 如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列— 它的数据。那 么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。 记住这些背景信息,接下来让我们研究一下 Netty 所提供的用于实现这两种组件的类。

1 Netty解码概述

1.1 两个问题

在这一节中,我们将研究 Netty 所提供的解码器类,这些类覆盖了两个不同的用例

  • 将字节解码为消息——ByteToMessageDecoder 和 ReplayingDecoder
  • 将一种消息类型解码为另一种——MessageToMessageDecoder

因为解码器是负责将入站数据从一种格式转换到另一种格式,所以知道 Netty 的解码器实 现了 ChannelInboundHandler 也不会让你感到意外 什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 Channel- InboundHandler 转换入站数据时会用到 此外,得益于ChannelPipeline 的设计,可以将多个解码器连接在一起,以实现任意复杂的转换逻辑,这也是 Netty 是如何支持代码的模块化以及复用的一个很好的例子

2 抽象解码器ByteToMessageDecoder

2.1 示例

将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 特地为它提供了一个抽象的基类:ByteToMessageDecoder 由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理

ByteToMessageDecoderAPI

假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理 在这种情况下,你需要从入站ByteBuf中读取每个 int,并将它传递给ChannelPipeline 中的下一个 ChannelInboundHandler 为了解码这个字节流,你要扩展 ByteToMessageDecoder类(原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)

ToIntegerDecoder

每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int,然后将它添加到一个 List 中 当没有更多的元素可以被添加到该 List 中时,它的内容将会被发送给下一个 Channel- InboundHandler

ToIntegerDecoder类扩展了ByteToMessageDecoder

虽然ByteToMessageDecoder可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐 在下一节中, 我们将讨论 ReplayingDecoder,它是一个特殊的解码器,以少量的开销消除了这个步骤

2.2 源码解析

解码步骤

2.2.1 累加字节流

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //基于 ByteBuf 进行解码的,如果不是直接将当前对象向下传播
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                //若当前累加器为空,说明是第一次从 IO 流中读取数据
                first = cumulation == null;
                if (first) {
                    //第一次会将累加器赋值为刚读进来的 ByteBuf 对象数据
                    cumulation = data;
                } else {
                    //非第一次,则将当前累加器中的数据和读取进来的数据进行累加
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //调用子类的解码方法去解析
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

其中的cumulator

看一下这个MERGE_CUMULATOR

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            //当前的写指针后移一定字节,若超过最大容量,则进行扩容
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain().
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            //将当前数据写到累加器中
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

编解码器中的引用计数

对于编码器和解码器来说:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放 如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message)这将会增加该引用计数,从而防止该消息被释放

3 基于固定长度解码器分析

4 行解码器分析

5 基于分隔符解码器分析

6 基于长度域解码器参数分析

7 基于长度域解码器分析

8 解码器总结

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏喔家ArchiSelf

从构造函数看线程安全

线程是编程中常用而且强大的手段,在使用过程中,我们经常面对的就是线程安全问题了。对于Java中常见的数据结构而言,一般的,ArrayList是非线程安全的,Ve...

1132
来自专栏三好码农的三亩自留地

Java动态代理-实战

说动态代理之前,要先搞明白什么是代理,代理的字面意思已经很容易理解了,我们这里撇开其他的解释,我们只谈设计模式中的代理模式

3062
来自专栏ImportSource

Junit 5新特性全集

本文略长,但都是大白话,如果你能一口气看完,你赢了。 如果你来不及看这么长,那么建议你滑到文末,直接看黑体部分就知道大概了。 在5中的一个测试类的基本生命周期是...

49412
来自专栏mini188

java中的锁

java中有哪些锁 这个问题在我看了一遍<java并发编程>后尽然无法回答,说明自己对于锁的概念了解的不够。于是再次翻看了一下书里的内容,突然有点打开脑门的感觉...

4769
来自专栏Android 研究

OKHttp源码解析(五)--OKIO简介及FileSystem

okio是由square公司开发的,它补充了java.io和java.nio的不足,以便能够更加方便,快速的访问、存储和处理你的数据。OKHttp底层也是用该库...

2403
来自专栏菩提树下的杨过

rpc框架之 thrift 学习 2 - 基本概念

thrift的基本构架: ? 上图源自:http://jnb.ociweb.com/jnb/jnbJun2009.html 底层Underlying I/O以上...

2597
来自专栏我就是马云飞

LruCache源码解析

 今天我们来聊聊缓存策略相关的内容,LruCache应该说是三级缓存策略会使用到的内存缓存策略。今天我们就来扒一扒这里面的原理,同时也温故温故我们的数据结构方面...

2467
来自专栏iOS开发

iOS开发之 Method Swizzling 深入浅出

如果产品经理突然说:"在所有页面添加统计功能,也就是用户进入这个页面就统计一次"。我们会想到下面的一些方法:

4857
来自专栏Android知识点总结

Java总结IO篇之File类和Properties类

打开颜色选择器 :读流I-->字符串分割-->字符串存入Map-->使用Map对象还原用户配置 修改配置时 :写流O-->创建Map对象-->字符...

1842
来自专栏大内老A

如何解决EnterLib异常处理框架最大的局限——基于异常"类型"的异常处理策略

个人觉得EnterLib的EHAB(Exception Handling Application Block)是一个不错的异常处理框架,借助于EHAB,我们可以...

2145

扫码关注云+社区

领取腾讯云代金券