前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty之Protobuf​编解码框架

Netty之Protobuf​编解码框架

作者头像
Liusy
修改2020-09-01 17:21:29
6940
修改2020-09-01 17:21:29
举报
文章被收录于专栏:Liusy01

Protobuf

Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于传统的XML序列化工具,它更小、更快、更简单。支持结构化数据一次可以到处使用,包括跨语言,通过代码生成工具可以自动生成不同语言版本的源代码,可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。

其支持定义可选、必选字段。

使用案例

添加依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.8.0</version>
</dependency>

下载Protobuf

网址:https://developers.google.com/protocol-buffers/

google由于国内不能打开,所以有需要可以找我

解压:需要的就是这个protoc.exe可执行文件,需要用它根据配置生成需传输对象的代码。

生成类配置文件

我这边需要生成两个类,所以需要两个配置文件。

SubscribeReq.proto

代码语言:javascript
复制
syntax = "proto2";  //协议版本
package netty;      //包名,其他 proto 可在此包下引用此 proto 的时候
option java_package = "com.liusy.protobuf"; //生成类的包名,注意:会在指定路径下按照该包名的定义来生成文件夹
option java_outer_classname = "SubscribeReqProto"; //生成类的类名

message SubscribeReq{
    required int32 subReqId = 1;  //required必须字段
    optional string userName = 2; //optional不必须字段
    required string productName = 3;
    repeated string address = 4; //数组字段,0或多个
}

SubscribeResp.proto

代码语言:javascript
复制
syntax = "proto2";
package netty;
option java_package = "com.liusy.protobuf";
option java_outer_classname = "SubscribeRespProto";

message SubscribeResp{
     required int32 subReqId = 1;
     required int32 respCode = 2;
     required string desc = 3;
}

此时需要在cmd中输入命令 proto.exe -I="proto文件所在路径" --java_out="输出路径" "proto文件详细路径" 命令生成类

我的目录是这样的

所以我这边的命令就是

代码语言:javascript
复制
 ./protoc.exe -I=./ --java_out=./ SubscribeReq.proto
 

执行命令之后,就会在自定义的目录生成在proto文件里定义的目录以及类

然后将代码复制到项目中

小测试

生成代码之后使用比较简单

代码语言:javascript
复制
public class TestSubscribeReqProto {

    //编码
    private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
        return req.toByteArray();
    }

    //解码
    private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
        return SubscribeReqProto.SubscribeReq.parseFrom(body);
    }

    private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqId(1);
        builder.setUserName("tom");
        builder.setProductName("表");
        return builder.build();
    }

    public static void main(String[] args) throws InvalidProtocolBufferException {
        SubscribeReqProto.SubscribeReq subscribeReq = createSubscribeReq();
        System.out.println("before:"+subscribeReq.toString());
        SubscribeReqProto.SubscribeReq decode = decode(encode(subscribeReq));
        System.out.println("afterDecoder:"+decode.toString());
        System.out.println(subscribeReq.equals(decode));

    }
}

运行结果:

结合Netty使用

客户端:

代码语言:javascript
复制
public class SubServer {

    private static final Log logger = LogFactory.getLog(SubServer.class);

    public static void main(String[] args) {
        new SubServer().start();
    }

    private void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //半包处理
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            //需要解码的目标类
                            pipeline.addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new SubServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);

            ChannelFuture future = server.bind(8080).sync();
            logger.info("服务器端启动成功");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("服务器端启动失败",e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

ProtobufDecoder只负责解码,如果想要处理半包信息,Netty提供了ProtobufVarint32FrameDecoder类,以及添加长度域的类ProtobufVarint32LengthFieldPrepender。也可以使用上一篇说到的LengthFieldBasedFrameDecoder和LengthFieldPrepender。

服务端处理器:

代码语言:javascript
复制
public class SubServerHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //由于ProtobufDecoder已经进行了解码,所以这边进行类型强转就行
        SubscribeReqProto.SubscribeReq subscribeReq = (SubscribeReqProto.SubscribeReq) msg;
        System.out.println("username:"+subscribeReq.getUserName()+"\nproductName:"+subscribeReq.getProductName());
        ctx.writeAndFlush(resp(subscribeReq.getSubReqId()));
    }


    /**
    *构建response返回给客户端
    */
    public SubscribeRespProto.SubscribeResp resp(int id) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqId(id);
        builder.setRespCode(200);
        builder.setDesc("服务端已接收到信息");
        return builder.build();
    }
}

客户端:

代码语言:javascript
复制
public class SubClient {
    private static final Log logger = LogFactory.getLog(SubClient.class);

    public static void main(String[] args) {
        new SubClient().start();
    }

    private void start() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            pipeline.addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new SubClientHandler());
                        }
                    });
            ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
            logger.info("客户端已启动");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("客户端启动失败",e);
        } finally {
            group.shutdownGracefully();
        }

    }

}

客户端处理器:

代码语言:javascript
复制
public class SubClientHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    
    /**
    *客户单启动时构建请求消息发送给服务端
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqId(1);
        builder.setProductName("电脑");
        builder.setUserName("tom来自客户端");
        ctx.writeAndFlush(builder.build());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeRespProto.SubscribeResp subscribeResp = (SubscribeRespProto.SubscribeResp) msg;
        System.out.println(subscribeResp.getDesc());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

客户端在启动后发送一个消息给服务端

先后启动服务端,客户端,运行结果如下:

服务端:

客户端:

上述就是Google的Protobuf编解码框架的使用。更多内容敬请关注公众号。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Liusy01 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Protobuf
  • 使用案例
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档