前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读

Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读

作者头像
小小工匠
发布2023-12-22 15:06:22
2090
发布2023-12-22 15:06:22
举报
文章被收录于专栏:小工匠聊架构
在这里插入图片描述
在这里插入图片描述

概述

在这里插入图片描述
在这里插入图片描述

Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。

ObjectEncoder

ObjectEncoder 是 Netty 中用于将对象编码为字节流的一种组件。在 Netty 的 pipeline 中,当你需要将某个对象发送到网络时,你可以使用 ObjectEncoder 来实现。它会将对象序列化为字节流,以便可以在网络中传输。 例如,当你使用 Netty 的 Bootstrap 类来配置你的客户端时,你可以为你的 channel pipeline 添加一个 ObjectEncoder

代码语言:javascript
复制
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectEncoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectEncoder 被添加到了 channel 的 pipeline 中,这样在数据传输过程中,发送的对象就会被自动编码为字节流。


ObjectDecoder

ObjectEncoder 相对应,ObjectDecoder 是用于将接收到的字节流解码为对象的组件。当你在 Netty 的 pipeline 中接收到字节流时,你可以使用 ObjectDecoder 来自动将字节流反序列化为对象。

继续上面的例子,如果你想在 pipeline 中添加 ObjectDecoder,你可以这样做:

代码语言:javascript
复制
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectDecoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectDecoder 被添加到了 channel 的 pipeline 中,这样在数据接收过程中,接收到的字节流就会被自动解码为对象。 总的来说,ObjectEncoderObjectDecoder 是 Netty 中用于对象序列化和反序列化的工具,它们让开发者可以更方便地在网络中传输对象。


Code

在这里插入图片描述
在这里插入图片描述

这段代码是一个简单的Netty服务器启动类

代码语言:javascript
复制
package com.artisan.codec.objectencoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 配置ServerBootstrap
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectDecoder
                            pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty server start。。");
            // 绑定端口并启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(4567).sync();
            // 等待服务器通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上述代码中,NettyServer类通过ServerBootstrap配置并启动了一个Netty服务器。服务器使用了两个事件循环组:一个用于处理连接(bossGroup),另一个用于处理已连接的通道(workerGroup)。 在initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectDecoder和自定义的处理器NettyServerHandlerObjectDecoder用于反序列化接收到的字节流为Java对象,NettyServerHandler用于处理业务逻辑。 服务器启动后,会绑定到指定端口(本例中为4567),并等待服务器通道关闭。在关闭服务器之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。 请注意,此代码片段仅作为Netty服务器启动的示例,实际应用中需要根据具体业务需求调整NettyServerHandler以实现相应的功能。


代码语言:javascript
复制
package com.artisan.codec.objectencoder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到客户端发送的消息时,执行该方法
        System.out.println("从客户端读取到Object:" + ((ArtisanSimple) msg).toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当发生异常时,执行该方法
        cause.printStackTrace();
        ctx.close();
    }
}

代码语言:javascript
复制
package com.artisan.codec.objectencoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 配置Bootstrap
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectEncoder
                            pipeline.addLast(new ObjectEncoder());
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty client start。。");
            // 连接到服务器
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 4567).sync();
            // 等待客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            group.shutdownGracefully();
        }
    }
}

在上述代码中,NettyClient类通过Bootstrap配置并启动了一个Netty客户端。客户端使用了一个事件循环组(group)来处理通道的连接和接收到的消息。

initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectEncoder和自定义的处理器NettyClientHandlerObjectEncoder用于将Java对象序列化为字节流,NettyClientHandler用于处理业务逻辑。

客户端启动后,会连接到指定IP地址(本例中为127.0.0.1)和端口(本例中为4567)的服务器,并等待客户端通道关闭。在关闭客户端之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。

请注意,此代码片段仅作为Netty客户端启动的示例,实际应用中需要根据具体业务需求调整NettyClientHandler以实现相应的功能。


这段代码是一个自定义的Netty处理器,名为NettyClientHandler。它继承自ChannelInboundHandlerAdapter,用于处理客户端接收到的消息和通道激活事件。

代码语言:javascript
复制
package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到服务器发送的消息时,执行该方法
        System.out.println("收到服务器消息:" + msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当通道激活时,执行该方法
        System.out.println("NettyClientHandler发送数据");
        // 测试对象编解码
        ArtisanSimple artisanSimple = new ArtisanSimple(1, "xxxx");
        ctx.writeAndFlush(artisanSimple);
    }
}

在上述代码中,NettyClientHandler类重写了channelReadchannelActive方法。 channelRead方法用于处理客户端接收到的服务器消息。在这个例子中,它将打印出接收到的消息。在实际应用中,你可以根据业务需求修改此方法以处理不同的消息类型和逻辑。

channelActive方法用于处理通道激活事件。在这个例子中,它将打印一条日志,并测试对象编解码功能。具体来说,它创建了一个ArtisanSimple对象,并通过ctx.writeAndFlush()方法将其发送到服务器。

在实际应用中,你可以根据需求修改此方法以实现不同的业务逻辑。

NettyClientHandler处理器需要与ObjectEncoderObjectDecoder配合使用,以确保发送和接收到的字节流能够正确地反序列化为Java对象。在客户端启动类NettyClient中,NettyClientHandler已经添加到了通道的pipeline中,因此可以处理发送和接收到的消息。


代码语言:javascript
复制
package com.artisan.codec.objectencoder;

import java.io.Serializable;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanSimple implements Serializable {

    private int id;
    private String name;

    public ArtisanSimple() {
    }

    public ArtisanSimple(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }


    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "ArtisanSimple{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }



}
代码语言:javascript
复制
package com.artisan.codec.objectencoder;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class AddressSimple {

    private String location;


    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }


    public AddressSimple() {
    }

    public AddressSimple(String location) {
        this.location = location;
    }


    @Override
    public String toString() {
        return "AddressSimple{" +
                "location='" + location + '\'' +
                '}';
    }
}

【测试】

在这里插入图片描述
在这里插入图片描述

源码分析

在这里插入图片描述
在这里插入图片描述

ObjectEncoder

这段代码定义了一个名为ObjectEncoder的类,它属于Netty网络通信框架的一部分,用于将Java对象序列化为字节流。

在这里插入图片描述
在这里插入图片描述

下面是对代码的详细分析以及增加的中文注释:

代码语言:javascript
复制
package io.netty.handler.codec.serialization; 

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; 
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
 * An encoder which serializes a Java object into a {@link ByteBuf}.
 * <p>
 * 请注意,此编码器产生的序列化形式与标准的{@link ObjectInputStream}不兼容。
 * 请使用{@link ObjectDecoder}或{@link ObjectDecoderInputStream}以确保与该编码器的互操作性。
 */
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    // 定义一个占位符,用于标记ByteBuf中对象序列化数据长度的位置
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    // 覆写encode方法,实现序列化逻辑
    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        int startIdx = out.writerIndex(); // 记录开始编码的位置
        // 创建一个ByteBufOutputStream包装器,用于向ByteBuf中写入数据
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        ObjectOutputStream oout = null;
        try {
            bout.write(LENGTH_PLACEHOLDER); // 先写入长度占位符
            // 创建一个紧凑型ObjectOutputStream,用于序列化对象
            oout = new CompactObjectOutputStream(bout);
            oout.writeObject(msg); // 将要序列化的对象写入流中
            oout.flush(); // 刷新输出流,确保所有数据都被写出
        } finally {
            // 关闭ObjectOutputStream和ByteBufOutputStream
            if (oout != null) {
                oout.close();
            } else {
                bout.close();
            }
        }
        int endIdx = out.writerIndex(); // 记录编码结束的位置
        // 设置占位符的长度,即实际序列化数据长度
        out.setInt(startIdx, endIdx - startIdx - 4);
    }
}

在上述代码中,ObjectEncoder类继承自MessageToByteEncoder,这意味着它是一个用于将某种类型消息编码成字节流的编码器。encode方法被重写以实现序列化过程。在这个方法中,首先通过ByteBufOutputStreamByteBuf写入了一个长度占位符,然后通过CompactObjectOutputStream将传入的Serializable对象序列化成字节流,并写入到ByteBuf中。最后,修改了长度占位符,将其设置为实际序列化数据的长度。

此代码片段使用@Sharable注解标记,表明这个ChannelHandler是可以共享给多个ChannelPipeline的。 序列化完成后,通过ObjectOutputStreamflush方法刷新流,确保所有数据都被写出。最后,在finally块中关闭输出流,确保资源被正确释放。


ObjectDecoder

这段代码定义了一个名为ObjectDecoder的类,它也是Netty网络通信框架的一部分,用于将接收到的字节流反序列化为Java对象。

在这里插入图片描述
在这里插入图片描述

下面是对代码的详细分析以及增加的中文注释:

代码语言:javascript
复制
package io.netty.handler.codec.serialization;
// 引入Netty相关类库
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 引入Java序列化相关类库
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
/**
 * A decoder which deserializes the received {@link ByteBuf}s into Java
 * objects.
 * <p>
 * 请注意,此解码器期望的序列化形式与标准的{@link ObjectOutputStream}不兼容。
 * 请使用{@link ObjectEncoder}或{@link ObjectEncoderOutputStream}以确保与该解码器的互操作性。
 */
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
    // ClassResolver用于加载序列化对象的类
    private final ClassResolver classResolver;
    /**
     * 创建一个新的解码器,其最大对象大小为1048576字节。
     * 如果接收到的对象大小大于1048576字节,将抛出StreamCorruptedException异常。
     *
     * @param classResolver  用于此解码器的ClassResolver
     */
    public ObjectDecoder(ClassResolver classResolver) {
        this(1048576, classResolver);
    }
    /**
     * 创建一个新的解码器,其最大对象大小为指定的值。
     *
     * @param maxObjectSize  序列化对象的最大字节长度。
     *                     如果接收到的对象的长度大于此值,将抛出StreamCorruptedException异常。
     * @param classResolver  用于加载序列化对象类的ClassResolver
     */
    public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
        super(maxObjectSize, 0, 4, 0, 4);
        this.classResolver = classResolver;
    }
    // 覆写decode方法,实现反序列化逻辑
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }
        // 创建一个紧凑型ObjectInputStream,用于反序列化对象
        ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
        try {
            return ois.readObject(); // 读取并返回反序列化的对象
        } finally {
            ois.close(); // 关闭输入流
        }
    }
}

在上述代码中,ObjectDecoder类继承自LengthFieldBasedFrameDecoder,这意味着它是一个用于解码具有长度字段帧的解码器。decode方法被重写以实现反序列化过程。在这个方法中,首先通过LengthFieldBasedFrameDecoder的解码方法获取到包含序列化数据的ByteBuf帧,然后通过CompactObjectInputStream将字节流反序列化为Java对象。

此代码片段使用了一个ClassResolver,它负责加载序列化对象的类,从而允许在反序列化过程中创建对象。反序列化完成后,通过ObjectInputStreamclose方法关闭输入流,确保资源被正确释放。


小结

ObjectEncoder和ObjectDecoder是Netty框架中的两个重要组件,它们分别负责将Java对象编码为字节流以及将字节流解码为Java对象。

ObjectEncoder是一个ChannelOutboundHandler,它主要负责将Java对象转换为字节流。当发送一个对象时,ObjectEncoder会根据对象的类型将其序列化为字节流,以便在网络上进行传输。ObjectEncoder通常与ObjectDecoder配合使用,以确保编码和解码过程能够正确地进行。

ObjectDecoder是一个ChannelInboundHandler,它主要负责将接收到的字节流解码为Java对象。当接收到字节流时,ObjectDecoder会根据字节流的类型进行反序列化,将字节流转换回原始的Java对象。ObjectDecoder通常与ObjectEncoder配合使用,以确保编码和解码过程能够正确地进行。

在实际应用中,ObjectEncoderObjectDecoder需要根据业务需求进行定制,以便正确地处理各种不同类型的对象。通过使用这两个组件,Netty框架可以在发送和接收消息时自动进行对象的编码和解码,简化了网络编程的复杂度。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
    • ObjectEncoder
      • ObjectDecoder
      • Code
      • 源码分析
        • ObjectEncoder
          • ObjectDecoder
          • 小结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档