Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能和高伸缩性的服务器和客户端。Netty 拥有高性能,吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
NIO 的缺点
Netty 的优点
模型解释
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类。
可以看出都是继承于AbstractBootStrap抽象类,所以大致上的配置方法都相同。
一般来说,使用Bootstrap创建启动器的步骤可分为以下几步:
服务端要使用两个线程组:
一般创建线程组直接使用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:
//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的 Channel 实例。
常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选:
还有就是同步阻塞的,一般没什么人用:
option() 设置的是服务端用于接收进来的连接,也就是 boosGroup 线程。option() 常用参数:
SO_BACKLOG //Socket 参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows 为200,其他为128。
childOption() 是提供给父管道接收到的连接,也就是 workerGroup 线程。childOption() 常用的参数:
SO_RCVBUF //Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY //TCP 参数,立即发送数据,默认值为 True。
SO_KEEPALIVE //Socket 参数,连接保活,默认值为 False。启用该功能时,TCP 会主动探测空闲连接的有效性。
ChannelPipeline 是 Netty 处理请求的责任链,ChannelHandler 则是具体处理请求的处理器。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
处理器 Handler 分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)。
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler,通过 ChannelHandlerContext 上下文对象,就可以拿到 Channel、Pipeline 等对象,就可以进行读写等操作。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler。两种类型的 Handler 互不干扰,相同类型的 Handler 的处理顺序是有影响的。
在 Bootstrap 中 childHandler() 方法需要初始化通道,实例化一个 ChannelInitializer,这时候需要重写 initChannel() 初始化通道的方法,装配流水线就是在这个地方进行。代码如下:
//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
自定义 NettyServerHandler 代码如下:
package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道 channel,管道 pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//将 msg 转成一个 ByteBuf,类似 NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕后的处理方法
*
* @param ctx 上下文对象, 含有通道 channel,管道 pipeline
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动,如果加上 sync() 方法则是同步。
// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 异步
// 给cf注册监听器,监听我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});
//释放掉所有的资源,包括创建的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
package com.chengzw.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty 服务端
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 bossGroup 和 workerGroup, 含有的子线程 NioEventLoop 的个数默认为cpu核数的两倍
// bossGroup 只是处理连接请求 ,真正的和客户端业务处理,会交给 workerGroup 完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1个线程
EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8个线程
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用 NioServerSocketChannel 作为服务器的通道实现,该类用于实例化新的 Channel 来接收客户端的连接
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start ...");
// 绑定一个端口并且同步, 生成了一个 ChannelFuture 异步对象,通过 isDone() 等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind 是异步操作,sync 方法是等待异步操作执行完毕
// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 异步
// 给cf注册监听器,监听我们关心的事件
/*channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
// 在这里面cf.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程,
// 即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。
// 原文的例子有英文注释:
// Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
// 线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver,
channelFuture.channel().closeFuture().sync();
} finally {
// 资源优雅释放
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端只需要一个 NioEventLoopGroup,其余代码和服务器类似。首先自定义 NettyClientHandler 用于处理客户端 ChannelPipeline 的业务。
package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
* @author 程治玮
* @since 2021/3/25 9:50 下午
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 当通道有读取事件时会触发,即服务端发送数据给客户端
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
然后配置客户端启动器:
package com.chengzw.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty 客户端
* @author 程治玮
* @since 2021/3/25 9:52 下午
*/
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start..");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}