前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty之MessagePack编解码框架

Netty之MessagePack编解码框架

作者头像
Liusy
修改2020-09-01 17:19:44
3550
修改2020-09-01 17:19:44
举报
文章被收录于专栏:Liusy01Liusy01

MessagePack

一个高效的二进制序列化格式。它让你像JSON一样可以在各种语言之间交换数据。但是它比JSON更快、更小的整数会被编码成一个字节,短的字符串仅仅只需要比它的长度多一字节的大小。

特点:编解码高效,性能高

     序列化之后的码流好

       支持跨语言

官网定义如下:

使用案例

添加依赖

代码语言:javascript
复制
<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>msgpack</artifactId>
    <version>0.6.12</version>
</dependency>
<dependency>
    <groupId>org.javassist</groupId>
    <artifactId>javassist</artifactId>
    <version>3.23.2-GA</version>
</dependency>

定义传输对象

代码语言:javascript
复制
@Message
public class UserInfo {
    private int id;
    private String name;
    public UserInfo() {
    }
    //省略get/set方法
    @Override
    public String toString() {
        return "UserInfo{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

需要注意的是: (1)必须添加@Message注解,表明可以对此对象进行序列化 (2)必须具有空参构造器

编码器

代码语言:javascript
复制
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
    private static final Log logger = LogFactory.getLog(MsgpackEncoder.class);
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        try {
            MessagePack messagePack = new MessagePack();
            byte[] write = messagePack.write(msg);
            out.writeBytes(write);
        } catch (Exception e) {
            logger.error("编码错误",e);
        }
    }
}

编码器需要继承MessageToByteEncoder抽象类,重写encode方法

解码器

代码语言:javascript
复制
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        final int length = msg.readableBytes();
        byte[] b = new byte[length];
        msg.getBytes(msg.readerIndex(), b,0,length);
        MessagePack messagePack = new MessagePack();
        //也可以不进行强转,看业务需要
        out.add(messagePack.read(b,UserInfo.class));
    }
}

解码器需要实现MessageToMessageDecoder抽象类,重写decode方法

服务端

代码语言:javascript
复制
public class MsgpackServer {
    private static final Log logger = LogFactory.getLog(MsgpackServer.class);
    public static void main(String[] args) {
        new MsgpackServer().start();
    }
    private void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(65536,0,2,0,2));
                            pipeline.addLast(new MsgpackDecoder());
                            pipeline.addLast(new LengthFieldPrepender(2));
                            pipeline.addLast(new MsgpackEncoder());
                            pipeline.addLast(new MsgpackServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = server.bind(8080).sync();
            logger.info("服务器端启动成功");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("服务器端启动失败",e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

1、在解码器前添加了LengthFieldBasedFrameDecoder,用来处理半包,其有五个参数,分别是:

(1)maxFrameLength:消息的最大长度 (2)lengthFieldOffset:数据包存放消息长度值的开始下标

(3)lengthFieldLength:消息长度值所占长度 (4)lengthAdjustment:数据包长度 - lengthFieldOffset - lengthFieldLength - 长度域的值 ,一般是0

(5)initialBytesToStrip:将接收到的数据包去除前initialBytesToStrip位

比如接收一个消息“HelloWorld”,共10个字节,长度域的内容是16进制的值,那么就是0x000a,此时长度域占2个字节 数据包大小为:12B = 长度域2B+消息体10B 编码:

如果initialBytesToStrip不为2的话,那么在服务端就会只接受到消息体,而没有表示消息长度的内容

2、在编码器之前添加LengthFieldPrepender,它就是在要发送的ByteBuf前添加2个字节的消息长度

服务端处理器

代码语言:javascript
复制
public class MsgpackServerHandler extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端信息:"+msg);
        ctx.write("服务端已接收到信息");
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

此处理器只用来输出消息内容,不对客户端发送消息,如果需要发送消息至客户端,可自 行添加业务逻辑处理。

客户端

代码语言:javascript
复制
public class MsgpackClient {
    private static final Log logger = LogFactory.getLog(MsgpackClient.class);
    public static void main(String[] args) {
        new MsgpackClient().start();
    }
    private void start() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                            pipeline.addLast(new MsgpackDecoder());
                            pipeline.addLast(new LengthFieldPrepender(2));
                            pipeline.addLast(new MsgpackEncoder());
                            pipeline.addLast(new MsgpackClientHandler());
                        }
                    });
            ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
            logger.info("客户端已启动");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("客户端启动失败",e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端也是需要添加半包处理器LengthFieldBasedFrameDecoder和添加长度域处理器LengthFieldPrepender

客户端处理器

代码语言:javascript
复制
public class MsgpackClientHandler extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UserInfo userInfo = new UserInfo();
        userInfo.setId(1);
        userInfo.setName("tom");
        ctx.writeAndFlush(userInfo);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

在启动后发送一个UserInfo对象给服务端。

运行结果

分别启动服务端、客户端

以上就是MessagePack的使用,也涉及到TCP粘包/拆包的处理。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Liusy01 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessagePack
  • 使用案例
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档