Netty 系列六(编解码器).

一、概念

    网络传输的单位是字节,如何将应用程序的数据转换为字节,以及将字节转换为应用程序的数据,就要说到到我们该篇介绍的编码器和解码器。

    将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器,同时具有这两种功能的单一组件叫作编解码器。 Netty 提供了一系列用来创建所有这些编码器、解码器以及编解码器的工具,还可以按需定制通用的消息转换编解码器。

    Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。

    对于编码器和解码器来说,其过程也是相当的简单:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放。如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message)方法。这将会增加该引用计数,从而防止该消息被释放。

二、编码器

    编码器将应用程序的数据转换为网络格式,出站消息时被调用,主要有两类:

1、将消息编码为字节:MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter{}
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    
    /**
     * 1、类型为 I 的出站消息被编码为 ByteBuf
     * 2、该 ByteBuf 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Short aShort, ByteBuf byteBuf) throws Exception {
        byteBuf.writeShort(aShort);
    }
}

2、将消息编码为消息:MessageToMessageEncoder

public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    
    /**
     * 1、类型为 I 的出站消息被编码为目标类型 存入List 中
     * 2、该 List 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

三、解码器

    解码器将网络格式转换为应用程序的数据,入站消息时被调用。

    解码器比编码器多了一个 decodeLast 方法,原因是解码器通常需要在 Channel 关闭之后产生最后一个消息。这显然不适用于编码器的场景 —— 在连接被关闭之后仍然产生一个消息是毫无意义的。所以,当 Channel 的状态变为非活动时,这个方法将会被调用一次。可以重写该方法以提供特殊的处理。

    解码器主要有两类:

1、将字节解码为消息:ByteToMessageDecoder 和 ReplayingDecoder

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {}
public class MyDecoder extends ByteToMessageDecoder {

    private static final int MAX_FRAME_SIZE = 1024;

    /**
     * 1、该方法被调用时,将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List.
     * 2、对该方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者Butebuf 没有更多可读取的字节为止。
     * 3、List 的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int readableBytes = byteBuf.readableBytes();
        //不能让解码器缓冲大量的数据以致于耗尽可用的内存
        if (readableBytes > MAX_FRAME_SIZE){
            //跳过所有的可读字节
            byteBuf.skipBytes(readableBytes);
            throw new TooLongFrameException("数据超过可缓存字节...");
        }
        //假设需要解析 int 类型的消息(int 4个字节)
        if  (readableBytes > 4){
            list.add(byteBuf.readInt());
        }

    }
}

       ----分割线----

//类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder{}
public class MyReplayingDecoder extends ReplayingDecoder<Void> {

    /**
     * 1、ReplayingDecoder 扩展了 ByteToMessageDecoder,并且自定义了 ByteBuf 的实现 ReplayingDecoderByteBuf。
     * 2、ReplayingDecoderByteBuf 对要转换的消息的字节数进行内部管理,如果没有足够的字节使用,将会抛出一个 Signal,由ReplayingDecoder进行处理。
     *
     * @param byteBuf 传入的 ByteBuf 实际上是 ReplayingDecoderByteBuf*/
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
       list.add(byteBuf.readInt());
    }
}

    继承 ByteToMessageDecoder 和继承 ReplayingDecoder 有什么区别呢?ReplayingDecoder 唯一的好处就是解码的时候不用进行字节数的判断,如上 ,因为它交由自定义的 ReplayingDecoderByteBuf 去处理了。但是 ReplayingDecoder 的效率稍慢于ByteToMessageDecoder。一般我们在这两个解码器中进行抉择的准则是:如果使用  ByteToMessageDecoder 不会引入太多的复杂性,那么请使用它;否则,请使用 ReplayingDecoder!

2、将消息解码为消息:MessageToMessageDecoder

public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
    
    /**
     * 1、对于每个需要被解码为另一种格式的入站消息来说,该方法将会被调用。
     * 2、解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception {
        list.add(String.valueOf(integer));
    }
}

四、编解码器

    Netty 的抽象编解码器类捆绑一个解码器/编码器对,主要用于在同一个类中管理入站和出站数据和消息的转换。

    个人觉得这个编解码器略显鸡肋呀,还是喜欢将编码器和解码器分开来写。因为 Netty 设计的一个基本准则就是:尽可能地将两种功能(编码器、解码器)分开,最大化代码的可重用性和可扩展性。

编解码器也主要有两类:

1、字节消息编解码器:ByteToMessageCodec

public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {}
public class MyBytetoMessageCodec extends ByteToMessageCodec<Short> {
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        while (in.readableBytes() > 4){
            out.add(in.readInt());
        }
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);
    }
}

2、消息转换编解码器:MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelHandlerAdapter {}
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {

    /**
     * 1、对于每个 OUTBOUND_IN 类型的消息,这个方法都会被调用。
     * 2、这个消息将会被编码为 INBOUND_IN 类型的消息。
     * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = msg.getByteBuf().duplicate().retain();
        switch (msg.getFrameType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(byteBuf));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(byteBuf));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, byteBuf));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(byteBuf));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(byteBuf));
                break;
            case PING:
                out.add(new PingWebSocketFrame(byteBuf));
            default:
                break;
        }


    }

    /**
     * 1、传入 INBOUND_IN 类型的消息,该方法会被调用。
     * 2、这个消息会被解码为 OUTBOUND_IN 类型的消息。
     * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelInboundHandler
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = msg.content().duplicate().retain();
        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, byteBuf));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, byteBuf));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, byteBuf));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, byteBuf));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, byteBuf));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, byteBuf));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }

    }


    public static final class MyWebSocketFrame {

        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }

        private final FrameType frameType;
        private final ByteBuf byteBuf;

        public MyWebSocketFrame(FrameType frameType, ByteBuf byteBuf) {
            this.frameType = frameType;
            this.byteBuf = byteBuf;
        }

        public FrameType getFrameType() {
            return frameType;
        }

        public ByteBuf getByteBuf() {
            return byteBuf;
        }
    }
}

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

rpc框架之 thrift 学习 2 - 基本概念

thrift的基本构架: ? 上图源自:http://jnb.ociweb.com/jnb/jnbJun2009.html 底层Underlying I/O以上...

2587
来自专栏程序员维他命

iOS 代码规范

花了一个月的时间结合几篇博客和书籍写了这套 iOS 代码规范(具体参考底部的参考文献部分)。这套代码规范除了有仅适用于 iOS 开发的部分,还有其他的比较通用性...

2162
来自专栏MasiMaro 的技术博文

windows 异常处理

为了程序的健壮性,windows 中提供了异常处理机制,称为结构化异常,异常一般分为硬件异常和软件异常,硬件异常一般是指在执行机器指令时发生的异常,比如试图向一...

1752
来自专栏androidBlog

java 解决文件名重复问题的两种方法

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

3991
来自专栏Android知识点总结

Java总结IO篇之File类和Properties类

打开颜色选择器 :读流I-->字符串分割-->字符串存入Map-->使用Map对象还原用户配置 修改配置时 :写流O-->创建Map对象-->字符...

1842
来自专栏coolblog.xyz技术专栏

自己动手实现一个简单的JSON解析器

JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。相对于另一种数据交换格式 XML,JSON 有着诸多优点。比如易读...

69119
来自专栏Android 研究

APK安装流程详解5——Installer、InstallerConnection和Installd守护进程

因为Installer继承自SystemService,所以我们看下Installer的onStart方法 代码在Installer.java 396行

1531
来自专栏jeremy的技术点滴

JVM的Finalization Delay引起的OOM

4068
来自专栏finleyMa

PHP7 新语法总结,更新7.2注意事项

太空船操作符用于比较两个表达式。当$a小于、等于或大于$b时它分别返回-1、0或1

4242
来自专栏芋道源码1024

Spring Webflux —— 源码阅读之 handler 包

查找给定请求的handler,如果找不到特定的请求,则返回一个空的Mono。这个方法被getHandler(org.springframework.web.se...

3685

扫码关注云+社区

领取腾讯云代金券