目录
1、Netty 是一个 基于 NIO 的 client-server(客户端服务器)框架,使用它可以快速简单地开发网络应用程序。 2、它极大地简化并优化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。 3、支持多种协议 如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。 用官方的总结就是:Netty 成功地找到了一种在不妥协可维护性和性能的情况下实现易于开发,性能,稳定性和灵活性的方法。
除了上面之外,很多开源项目比如我们常用的 Dubbo、RocketMQ、Elasticsearch、gRPC 等等都用到了 Netty。
当需要连接客户端或者服务器绑定指定端口是需要使用Bootstrap
,ServerBootstrap
有两种类型,一种是用于客户端的Bootstrap,一种是用于服务端 的ServerBootstrap。不管程序使用哪种协议,无论是创建一个客户端还是服务器都需要使 用“引导”。
Bootstrap
是客户端的启动引导类/辅助类
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动引导/辅助类:
Bootstrap Bootstrap b = new Bootstrap();
//指定线程模型
b.group(group). ......
// 尝试建立连接
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
// 优雅关闭相关线程组资源
group.shutdownGracefully();
}
ServerBootstrap
客户端的启动引导类/辅助类
// 1.bossGroup 用于接收连接,workerGroup 用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.创建服务端启动引导/辅助类:
ServerBootstrap ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup, workerGroup). ......
// 6.绑定端口
ChannelFuture f = b.bind(port).sync();
// 等待连接关闭
f.channel().closeFuture().sync();
} finally {
//7.优雅关闭相关线程组资源
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
}
Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个 Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端。 ServerBootstrap通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
Bootstrap 只需要配置一个线程组— EventLoopGroup,而 ServerBootstrap需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的处理。
分类 | Bootstrap | ServerBootstrap |
---|---|---|
网络功能 | 连接到远程主机和端口 | 绑定本地端口 |
EventLoopGroup 数量 | 1 | 2 |
一个 ServerBootstrap 可以认为有2个 Channel 集合,
第一个集合包含一个单例 ServerChannel,代表持有一个绑定了本地端口的 socket;
第二集合包含所有创建的 Channel,处理服务器所接收到的客户端进来的连接。
EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
Channel 和 EventLoop 直接有啥联系呢?
Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操作,两者配合参与 I/O 操作。
EventLoopGroup
包含多个EventLoop
,每个EventLoop通常内部包含一个线程。EventLoop在处理IO事件时在自己的Thread线程上进行,从而保证线程安全
NioEventLoopGroup在未指定线程数时,默认时当前cpu线程数*2
Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。
比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
Channel channel = ...; // 获取channel的引用
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); //1 ChannelFuture cf = channel.writeAndFlush(buf); //2
cf.addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) { //4
} });
channel声明周期 | 状态 | 描述 | | —- | —- | | ChannelUnregistered | Channel 已经被创建,但还未注册到EventLoop | | ChannelRegistered | Channel 已经被注册到了EventLoop | | ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 | | ChannelInactive | Channel 没有连接到远程节点 |
作用:
SelectonKey 状态
ChannelHandler是消息的处理器,负责读写操作和客户端连接等。
ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。 可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。 Netty 发送消息有两种方式。您可以直接写消息给 Channel 或写入 ChannelHandlerContext 对象。主要的区别是, 前一种方法会导致消息从 ChannelPipeline的 尾部开始,而 后者导致消息从 ChannelPipeline 下一个处理器开始。
ChannelHandler的子接口:
出站操作和数据将由ChannelOutboundHandler处理。它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。
public interface ChannelOutboundHandler extends ChannelHandler {
/**
当请求将Channel绑定到本地地址时被调用
/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
当请求将Channel连接到远程节点时被调用
/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
当请求将Channel从远程节点断开时被调用
/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
当请求关闭Channel时被调用
/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
当请求将Channel从它的EventLoop注销时被调用
/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
当请求从Channel读取更多的数据时被调用
/
void read(ChannelHandlerContext ctx) throws Exception;
/**
当请求通过Channel将数据写到远程节点时被调用
/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
当请求通过Channel将入队数据冲刷到远程节点时被调用
/
void flush(ChannelHandlerContext ctx) throws Exception;
}
/**
{@link ChannelHandler} which adds callbacks for state changes. This allows the user
to hook in to state changes easily.
/
public interface ChannelInboundHandler extends ChannelHandler {
/**
当Channel已经注册到它的EventLoop并且能够处理I/O时被调用
/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
当Channel从它的EventLoop注销并且无法处理任何I/O时被调用
/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
当Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪
/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
当Channel离开活动状态并且不再连接它的远程节点时被调用
/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
当从Channel读取数据时被调用
/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
当Channel上的一个读操作完成时被调用
/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个POJO被传经了ChannelPipeline
/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。可以通过调用Channel的isWritable()方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWater-Mark()方法来设置
/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
如果抛出一个可抛出的异常对象,则调用。
/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
一个channel对应一个pipeline,一个pipeline对应n个ChannelHandler
ChannelPipeline 调度 handler
ChannelHandlerContext
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。
ChannelHandlerContext有很多的方法,其中一些方法也存在于Channel和ChannelPipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* 返回绑定到这个实例的Channel
*/
Channel channel();
/**
* 返回调度事件的EventExecutor
*/
EventExecutor executor();
/**
* 返回这个实例的唯一名称
*/
String name();
/**
* 返回绑定到这个实例的ChannelHandler
*/
ChannelHandler handler();
/**
* 如果所关联的ChannelHandler已经被从ChannelPipeline中移除则返回true
*/
boolean isRemoved();
/**
* 触发对下一个ChannelInboundHandler上的fireChannelRegistered()方法的调用
*/
@Override
ChannelHandlerContext fireChannelRegistered();
/**
* 触发对下一个ChannelInboundHandler上的fireChannelUnregistered()方法的调用
*/
@Override
ChannelHandlerContext fireChannelUnregistered();
/**
* 触发对下一个ChannelInboundHandler上的fireChannelActive()方法的调用
*/
@Override
ChannelHandlerContext fireChannelActive();
/**
* 触发对下一个ChannelInboundHandler上的fireChannelInactive()方法的调用
*/
@Override
ChannelHandlerContext fireChannelInactive();
/**
* 触发对下一个ChannelInboundHandler上的fireExceptionCaught()方法的调用
*/
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
/**
* 触发对下一个ChannelInboundHandler上的fireUserEventTriggered()方法的调用
*/
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
/**
* 触发对下一个ChannelInboundHandler上的fireChannelRead()方法的调用
*/
@Override
ChannelHandlerContext fireChannelRead(Object msg);
/**
* 触发对下一个ChannelInboundHandler上的fireChannelReadComplete()方法的调用
*/
@Override
ChannelHandlerContext fireChannelReadComplete();
/**
* 触发对下一个ChannelInboundHandler上的fireChannelWritabilityChanged()方法的调用
*/
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
/**
* 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
*/
@Override
ChannelHandlerContext read();
/**
* 刷新所有挂起的消息。
*/
@Override
ChannelHandlerContext flush();
/**
* 返回这个实例所关联的ChannelPipeline
*/
ChannelPipeline pipeline();
/**
* 返回和这个实例相关联的Channel所配置的ByteBufAllocator
*/
ByteBufAllocator alloc();
/******************补充*************************/
//write 通过这个实例写入消息并经过ChannelPipeline
//writeAndFlush 通过这个实例写入并冲刷消息并经过ChannelPipeline
}