一个高效的二进制序列化格式。它让你像JSON一样可以在各种语言之间交换数据。但是它比JSON更快、更小的整数会被编码成一个字节,短的字符串仅仅只需要比它的长度多一字节的大小。
特点:编解码高效,性能高
序列化之后的码流好
支持跨语言
官网定义如下:
添加依赖
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.23.2-GA</version>
</dependency>
定义传输对象
@Message
public class UserInfo {
private int id;
private String name;
public UserInfo() {
}
//省略get/set方法
@Override
public String toString() {
return "UserInfo{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
需要注意的是: (1)必须添加@Message注解,表明可以对此对象进行序列化 (2)必须具有空参构造器
编码器
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
private static final Log logger = LogFactory.getLog(MsgpackEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
try {
MessagePack messagePack = new MessagePack();
byte[] write = messagePack.write(msg);
out.writeBytes(write);
} catch (Exception e) {
logger.error("编码错误",e);
}
}
}
编码器需要继承MessageToByteEncoder抽象类,重写encode方法
解码器
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final int length = msg.readableBytes();
byte[] b = new byte[length];
msg.getBytes(msg.readerIndex(), b,0,length);
MessagePack messagePack = new MessagePack();
//也可以不进行强转,看业务需要
out.add(messagePack.read(b,UserInfo.class));
}
}
解码器需要实现MessageToMessageDecoder抽象类,重写decode方法
服务端
public class MsgpackServer {
private static final Log logger = LogFactory.getLog(MsgpackServer.class);
public static void main(String[] args) {
new MsgpackServer().start();
}
private void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536,0,2,0,2));
pipeline.addLast(new MsgpackDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new MsgpackEncoder());
pipeline.addLast(new MsgpackServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = server.bind(8080).sync();
logger.info("服务器端启动成功");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("服务器端启动失败",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
1、在解码器前添加了LengthFieldBasedFrameDecoder,用来处理半包,其有五个参数,分别是:
(1)maxFrameLength:消息的最大长度 (2)lengthFieldOffset:数据包存放消息长度值的开始下标
(3)lengthFieldLength:消息长度值所占长度 (4)lengthAdjustment:数据包长度 - lengthFieldOffset - lengthFieldLength - 长度域的值 ,一般是0
(5)initialBytesToStrip:将接收到的数据包去除前initialBytesToStrip位
比如接收一个消息“HelloWorld”,共10个字节,长度域的内容是16进制的值,那么就是0x000a,此时长度域占2个字节 数据包大小为:12B = 长度域2B+消息体10B 编码:
如果initialBytesToStrip不为2的话,那么在服务端就会只接受到消息体,而没有表示消息长度的内容
2、在编码器之前添加LengthFieldPrepender,它就是在要发送的ByteBuf前添加2个字节的消息长度
服务端处理器
public class MsgpackServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到客户端信息:"+msg);
ctx.write("服务端已接收到信息");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
此处理器只用来输出消息内容,不对客户端发送消息,如果需要发送消息至客户端,可自 行添加业务逻辑处理。
客户端
public class MsgpackClient {
private static final Log logger = LogFactory.getLog(MsgpackClient.class);
public static void main(String[] args) {
new MsgpackClient().start();
}
private void start() {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap client = new Bootstrap();
client.group(group)
.option(ChannelOption.TCP_NODELAY,true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
pipeline.addLast(new MsgpackDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new MsgpackEncoder());
pipeline.addLast(new MsgpackClientHandler());
}
});
ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
logger.info("客户端已启动");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("客户端启动失败",e);
} finally {
group.shutdownGracefully();
}
}
}
客户端也是需要添加半包处理器LengthFieldBasedFrameDecoder和添加长度域处理器LengthFieldPrepender
客户端处理器
public class MsgpackClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UserInfo userInfo = new UserInfo();
userInfo.setId(1);
userInfo.setName("tom");
ctx.writeAndFlush(userInfo);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
在启动后发送一个UserInfo对象给服务端。
运行结果
分别启动服务端、客户端
以上就是MessagePack的使用,也涉及到TCP粘包/拆包的处理。