Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于传统的XML序列化工具,它更小、更快、更简单。支持结构化数据一次可以到处使用,包括跨语言,通过代码生成工具可以自动生成不同语言版本的源代码,可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。
其支持定义可选、必选字段。
添加依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
下载Protobuf
网址:https://developers.google.com/protocol-buffers/
google由于国内不能打开,所以有需要可以找我
解压:需要的就是这个protoc.exe可执行文件,需要用它根据配置生成需传输对象的代码。
生成类配置文件
我这边需要生成两个类,所以需要两个配置文件。
SubscribeReq.proto
syntax = "proto2"; //协议版本
package netty; //包名,其他 proto 可在此包下引用此 proto 的时候
option java_package = "com.liusy.protobuf"; //生成类的包名,注意:会在指定路径下按照该包名的定义来生成文件夹
option java_outer_classname = "SubscribeReqProto"; //生成类的类名
message SubscribeReq{
required int32 subReqId = 1; //required必须字段
optional string userName = 2; //optional不必须字段
required string productName = 3;
repeated string address = 4; //数组字段,0或多个
}
SubscribeResp.proto
syntax = "proto2";
package netty;
option java_package = "com.liusy.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
required int32 subReqId = 1;
required int32 respCode = 2;
required string desc = 3;
}
此时需要在cmd中输入命令 proto.exe -I="proto文件所在路径" --java_out="输出路径" "proto文件详细路径" 命令生成类
我的目录是这样的
所以我这边的命令就是
./protoc.exe -I=./ --java_out=./ SubscribeReq.proto
执行命令之后,就会在自定义的目录生成在proto文件里定义的目录以及类
然后将代码复制到项目中
小测试
生成代码之后使用比较简单
public class TestSubscribeReqProto {
//编码
private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
return req.toByteArray();
}
//解码
private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqId(1);
builder.setUserName("tom");
builder.setProductName("表");
return builder.build();
}
public static void main(String[] args) throws InvalidProtocolBufferException {
SubscribeReqProto.SubscribeReq subscribeReq = createSubscribeReq();
System.out.println("before:"+subscribeReq.toString());
SubscribeReqProto.SubscribeReq decode = decode(encode(subscribeReq));
System.out.println("afterDecoder:"+decode.toString());
System.out.println(subscribeReq.equals(decode));
}
}
运行结果:
结合Netty使用
客户端:
public class SubServer {
private static final Log logger = LogFactory.getLog(SubServer.class);
public static void main(String[] args) {
new SubServer().start();
}
private void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//半包处理
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//需要解码的目标类
pipeline.addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new SubServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = server.bind(8080).sync();
logger.info("服务器端启动成功");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("服务器端启动失败",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
ProtobufDecoder只负责解码,如果想要处理半包信息,Netty提供了ProtobufVarint32FrameDecoder类,以及添加长度域的类ProtobufVarint32LengthFieldPrepender。也可以使用上一篇说到的LengthFieldBasedFrameDecoder和LengthFieldPrepender。
服务端处理器:
public class SubServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//由于ProtobufDecoder已经进行了解码,所以这边进行类型强转就行
SubscribeReqProto.SubscribeReq subscribeReq = (SubscribeReqProto.SubscribeReq) msg;
System.out.println("username:"+subscribeReq.getUserName()+"\nproductName:"+subscribeReq.getProductName());
ctx.writeAndFlush(resp(subscribeReq.getSubReqId()));
}
/**
*构建response返回给客户端
*/
public SubscribeRespProto.SubscribeResp resp(int id) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqId(id);
builder.setRespCode(200);
builder.setDesc("服务端已接收到信息");
return builder.build();
}
}
客户端:
public class SubClient {
private static final Log logger = LogFactory.getLog(SubClient.class);
public static void main(String[] args) {
new SubClient().start();
}
private void start() {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap client = new Bootstrap();
client.group(group)
.option(ChannelOption.TCP_NODELAY,true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new SubClientHandler());
}
});
ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
logger.info("客户端已启动");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("客户端启动失败",e);
} finally {
group.shutdownGracefully();
}
}
}
客户端处理器:
public class SubClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
*客户单启动时构建请求消息发送给服务端
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqId(1);
builder.setProductName("电脑");
builder.setUserName("tom来自客户端");
ctx.writeAndFlush(builder.build());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeRespProto.SubscribeResp subscribeResp = (SubscribeRespProto.SubscribeResp) msg;
System.out.println(subscribeResp.getDesc());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
客户端在启动后发送一个消息给服务端
先后启动服务端,客户端,运行结果如下:
服务端:
客户端:
上述就是Google的Protobuf编解码框架的使用。更多内容敬请关注公众号。