本文来源:http://8rr.co/9jsX
NIO 中的 ByteBuffer
Netty 中的 ByteBuf
通过2个位置指针来协助缓冲区的读写,读使用 readerIndex,写使用 writerIndex。
put 时,效验剩余空间,当容量超过限制后,通过 System.arraycopy 方法来对数组进行扩容操作,重建一个新的 ByteBuf,并将之前的 ByteBuf 复制到新的 ByteBuf 中。
使用 volcatile 记录引用次数,使用原子对象类型 AtomicIntegerFieldUpdater 来对其进行更新。
Heap Buffer(堆缓冲区)
Direct Buffer(堆外缓冲区)
Composite Buffer(复合缓冲区)
复合缓冲区表示一部分是堆缓冲区,一部分是堆外缓冲区
// 堆缓冲区
ByteBuf heapBuf = Unpooled.buffer(8);
// 堆外缓冲区
ByteBuf directBuf = Unpooled.drectBuffer(16);
// 复合缓冲区
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
// 将堆和堆外缓冲区都添加到复合缓冲区中
compBuf.addComponents(heapBuf, directBuf);
// 删除堆缓冲区
compBuf.removeComponent(0);
// 输出
Iterator<ByteBuf> iter = compBuf.iterator();
while(iter.hasNext()) {
System.out.println(iter.next().toString());
}
// 堆缓冲
ByteBuf heapBuf = Unpooled.buffer(8);
// 堆外缓冲
ByteBuf directBuf = Unpooled.directBuffer(16);
// 复合缓冲
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
基于内存池实现,提前申请一块内存,当需要ByteBuf的时候,就从中申请一片内存。与UnPooledDirectByteBuf基本一致,唯一不同就是内存分配策略。
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
ByteBuf buf = Unpooled.buffer(16);
for (int i = 0; i < 16; i++) {
buf.writeByte(i + 1);
}
// read
for (int i = 0; i < buf.capacity(); i++) {
System.out.println(buf.getByte(i));
}
private
static
class
InitializerHandler
extends
ChannelInitializer<SocketChannel> {
@Override
protected
void
initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 长度适配,会先将缓冲区中的数据控制住
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0,2));
// 对象解码,根据readableBytes放心从缓冲区中读取
pipeline.addLast(new MessagePackDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new MessagePackEncoder());
pipeline.addLast(new ServerHandler());
}
}
事件循环集合,并在此期间将 Channel 注册到 Selector 上。
基于 NIO Selector 实现
基于 NIO Selector 实现的组
(获取一个 Selector,一般使用 SelectorProvider 的 provider 方法,内部可以看到调用了SelectorProvider.provider())
类似一个事件线程池
负责提供 EventExecutor,控制生命周期,以及全局的状态
用于快捷启动客户端通道的类
构造类似 Bootstrap,用于快捷启动服务端通道的类。通过简单的配置来设置或“引导”程序的一个重要的类。
结构
标记接口,会接收对端发过来的请求,并创建真正的与客户端连接的 child Channel;
TCP/IP 的 ServerChannel
是 ChannelHandler 实例的列表,用于处理或截获通道的接收和发送数据
addLast(ChannelHandler handler):表示在 Pipeline 末尾添加 ChannelHandler
ChannelOption(选项) 来帮助引导配置,可用各种选项配置底层连接详细信息,比如:keep-alive(保持活跃),timeout(超时时间)等
Attributes(属性),传递一些属性,只能本机上传递,并不能相互传递,比如:将用户信息与通道关联起来
示例:
// 创建属键对象
final AttributeKey<Integer> id = AttributeKey.valueOf("ID");
// 客户端引导对象
Bootstrap b = new Bootstrap();
// 设置EventLoop,设置通道类型
b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
// 设置ChannelHandler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
System.out.println("Reveived data");
msg.clear();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
// 通道注册后执行,获取属性值
// 注意:如果在服务端获取,是获取不到该ID的值的
Integer idValue = ctx.channel().attr(id).get();
System.out.println(idValue);
}
});
// 设置通道Option
b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 设置通道属性
b.attr(id, 123456);
ChannelFuture f = b.connect("localhost", 8080);
f.synUninterruptibly();
// 在任意的ChannelHandler的事件方法中调用
// 使用Channel
Channel channel = ctx.channel();
channel.write(Unpooled.copiedBuffer("Hello", CharsetUtil.UTF_8));
// 使用ChannelPipeline
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("World", CharsetUtil.UTF_8));
// 在指定的ChannelHandler下重写事件方法,并调用ChannelHandlerContext执行操作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(Unpooled.copiedBuffer("Hello", CharsetUtil.UTF_8));
ctx.flush();
}
线性流程
ChannelHandler 如果带有 @Sharable 注解,则可以被添加到多个 ChannelPipeline 中, 意味着单个 ChannelHandler 实例可以有多个 ChannelHandlerContext,此时需要注意 线程安全的问题。
ChannelInboundHandler
入站执行的Handler
InboundHandler 实现
ChannelOutboundHandler
出站执行的Handler
用于字节解码成消息,或字节解码成其他序列化字节,常用于将字节消息解码成POJO对象。
ChannelHandlerContext ctx, ByteBuf in, List<Object> out
将 ByteBuf 数据解码成其他形式的数据
例如:客户端接收到一个整型的字节码,服务器将数据读入 ByteBUf 并经过 ChannelPipeline 中的每个 Handler 进行处理
public
class
ToIntegerDecoder
extends
ByteToMessageDecoder
{
@Override
protected
void
decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
ReplayingDecoder 是 byte-to-message 解码的一种特殊的抽象基类,使用 ReplayingDecoder 无需检查缓冲区是否有 足够的字节。若字节足够,则正常读取;若没有足够的字节则会停止解码(这也是局限性)。
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
// 不需要判断是否有足够字节
out.add(in.readInt());
}
}
用于将消息对象转换成消息对象
ChannelHandlerContext ctx, I msg, List<Object> out
// 将接受的Integer消息转成String类型
public
class
IntegerToStringDecoder
extends
MessageToMessageDecoder<Integer> {
@Override
protected
void
decode(ChannelHandlerContext ctx, Integer msg,
List<Object> out)
throws Exception {
out.add(String.valueOf(msg));
}
}
将处理好的数据从转成字节码,以便在网络中传输。
ChannelHandlerContext ctx, I msg, ByteBuf out
// 将Integer值编码成byte[]
public
class
IntegerToByteEncoder
extends
MessageToByteEncoder<Integer> {
@Override
protected
void
encode(ChannelHandlerContext ctx, Integer msg,
ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
消息编码成其他消息
ChannelHandlerContext ctx, I msg, List<Object> out
// 将Integer值编码成String
public
class
IntegerToStringEncoder
extends
MessageToMessageEncoder<Integer> {
@Override
protected
void
encode(ChannelHandlerContext ctx, Integer msg,
List<Object> out)
throws Exception {
out.add(String.valueOf(msg));
}
}
ChannelHandlerContext ctx, I msg, ByteBuf out
ChannelHandlerContext ctx, ByteBuf in, List<Object> out
ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out
ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out
结合编码器和解码器
// 解码器,将byte转成char
public
class
ByteToCharDecoder
extends
ByteToMessageDecoder
{
@Override
protected
void
decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
throws Exception {
while (in.readableBytes() >=2 ) {
out.add(Character.valueOf(in.readChar()));
}
}
}
// 编码器,将char转成byte
public
class
CharToByteEncoder
extends
MessageToByteEncoder<Character> {
@Override
protected
void
encode(ChannelHandlerContext ctx, Character msg,
ByteBuf out)
throws Exception {
out.writeChar(msg);
}
}
// 继承CombinedChannelDuplexHandler,绑定解码器和编码器
public
class
CharCodec
extends 继承CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public
CharCodec()
{
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}
TCP 是个 “流” 协议,就是没有界限的一串数据。TCP 底层并不了解上层业务数据的具体含义, 他会根据 TCP 缓冲区的实际情况进行包的划分,所以在业务上人为,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小包封装成一个大的数据包进行发送。
// 接收端每次read的缓冲字节是可控的,本质还是流
byte[] bytes = new
byte[5];
in.readBytes(bytes);
由于底层的 TCP 无法理解上层业务数据,所以只能通过协议设计来解决(类似前后端公约);
ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(1024));
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.copiedBuffer("$".getBytes())));
ChannelPipeline p = ch.pipeline();
p.addLast(new FixedLengthFrameDecoder(5));
...
与 LengthFieldPrepender 配合使用,常用设置如下(最好需要自己根据文档配置,非常灵活)
// MAX,0,2,0,2
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0,2));
pipeline.addLast(new MessagePackDecoder());
// 2
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new MessagePackEncoder());
存储结构类似于 JSON,但是性能比 JSON 更好
简单示例
// Create serialize objects.
List<String> src = new ArrayList<>();
src.add("msgpack");
src.add("kumofs");
src.add("viver");
MessagePack msgpack = new MessagePack();
// Serialize
byte[] raw = msgpack.write(src);
System.err.println(Arrays.toString(raw));
// Deserialize directly using a template
List<String> dst1 = msgpack.read(raw, Templates.tList(Templates.TString));
System.out.println(dst1.get(0));
System.out.println(dst1.get(1));
System.out.println(dst1.get(2));
// Or, Deserialze to Value then convert type.
Value dynamic = msgpack.read(raw);
List<String> dst2 = new Converter(dynamic)
.read(Templates.tList(Templates.TString));
System.out.println(dst2.get(0));
System.out.println(dst2.get(1));
System.out.println(dst2.get(2));
POJO
// 必须要加@Message注解
@Message
public class UserInfo implements Serializable {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"name\":\"")
.append(name).append('\"');
sb.append(",\"age\":")
.append(age);
sb.append('}');
return sb.toString();
}
}
Encoder
public class MessagePackEncoder extends MessageToByteEncoder<UserInfo> {
@Override
protected void encode(ChannelHandlerContext ctx, UserInfo userInfo, ByteBuf out) throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(userInfo);
out.writeBytes(raw);
}
}
Decoder
public class MessagePackDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
byte[] bytes = new byte[in.readableBytes()];
// 方法1
in.readBytes(bytes);
// 方法2
// in.getBytes(in.readerIndex(), bytes, 0, length);
MessagePack messagePack = new MessagePack();
UserInfo value = messagePack.read(bytes, UserInfo.class);
System.err.println(value);
}
}
Netty 封装了 Protobuf 的编解码,非常方便
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 以下4个
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
WebSocket 是一个协议,而 Socket 是一个套接字的技术栈
GET
/chat
HTTP/1.1
Host:
server.example.com
Upgrade:
websocket
Connection:
Upgrade
Sec-WebSocket-Key:
dGh1IHNDFLUGLSJDF==
Origin:
http://example.com
Sec-WebSocket-Protocol:
chat,
superchat
Sec-WebSocket-Version:
13
HTTP/1.1
101
Switching
Protocols
Upgrade:
websocket
Connection:
Upgrade
Sec-WebSocket-Accept:
s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol:
chat
Netty 基于 HTTP 协议栈开发了 WebSocket 协议栈,可以很方便的开发 WebSocket 客户端和服务端。
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
handler(new LoggingHandler(LogLevel.INFO)).
childHandler(new WebSocketChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 将请求或应答编解码成 HTTP 消息
pipeline.addLast(new HttpServerCodec());
// 向客户端发送HTML5文件,主要用于支持浏览器和服务端进行WebSocket通信
pipeline.addLast(new ChunkedWriteHandler());
// 将HTTP消息的多个部分组合成一条完整的HTTP消息
pipeline.addLast(new HttpObjectAggregator(8192));
// 增加标识协议头
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 增加 WebSocket服务端的Handler
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
public static class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息: " + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded: " + ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved: " + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生");
ctx.close();
}
}
}
私有协议架构图
名称 | 类型 | 长度 | 描述 |
---|---|---|---|
crcCode | int | 32 | 1)0xABEF:固定值,表明是 Netty 协议消息,2个字节2)主版本号:1~255,1个字节3)次版本号:1~255,1个字节 |
length | int | 64 | 消息长度,包括消息头和消息体 |
sessionID | long | 64 | 集群节点内全局唯一,由会话ID生成器生成 |
type | Byte | 8 | 0:业务请求消息;1:业务响应消息;2:既是请求又是响应3:握手请求消息;4:握手应答消息;5:心跳请求消息6:心跳应答消息 |
priority | Byte | 8 | 消息优先级:0~255 |
attachment | Map | 变长 | 可选,用于扩展消息头 |
选择合适的序列化方式,以及编解码方式
时序图