1、NIO存在的问题:
正是因为NIO存在这些问题,netty就应运而生了。
2、Netty简介: netty是一个异步的,基于事件驱动的网络应用框架。可以快速地开发高性能的服务器端和客户端,像dubbo和elasticsearch底层都用了netty。它具有以下优点:
官方下载地址:https://bintray.com/netty/downloads 我本次下载的版本是:4.1.51.Final
1、线程模型: 目前存在的线程模式:
根据Reactor的数量和1处理资源的线程数不同,又分3种:
Netty的线程模型是基于主从Reactor多线程做了改进。
2、传统阻塞IO的线程模型: 采用阻塞IO获取输入的数据,每个连接都需要独立的线程来处理逻辑。存在的问题就是,当并发数很大时,就需要创建很多的线程,占用大量的资源。连接创建后,如果当前线程没有数据可读,该线程将会阻塞在读数据的方法上,造成线程资源浪费。
3、Reactor模式(分发者模式/反应器模式/通知者模式): 针对传统阻塞IO的模型,做了以下两点改进:
Reactor模式的核心组成:
4、单Reactor单线程:
模型图
这个图其实就跟之前的NIO群聊系统对应。多个客户端请求连接,然后Reactor通过selector轮询判断哪些通道是有事件发生的,如果是连接事件,就到了Acceptor中建立连接;如果是其他读写事件,就有dispatch分发到对应的handler中进行处理。这种模式的缺点就是Reactor和Handler是在一个线程中的,如果Handler阻塞了,那么程序就阻塞了。
5、单Reactor多线程:
单reactor多线程
处理流程如下:
相比单Reactor单线程,这里将业务处理的事情交给了不同的线程去做,发挥了多核CPU的性能。但是Reactor只有一个,所有事件的监听和响应,都由一个Reactor去完成,并发性还是不好。
6、主从Reactor多线程:
主从reactor多线程
这个模型相比单reactor多线程的区别就是:专门搞了一个MainReactor来处理连接事件,如果不是连接事件,就分发给SubReactor进行处理。图中这个SubReactor只有一个,其实是可以有多个的,所以性能就上去了。
7、netty的模型: netty模型是基于主从Reactor多线程模型设计的,其工作流程如下:
netty模型图如下,对应了上面那段流程:
netty模型图
使用netty创建一个服务端与客户端,监听6666端口。
1、服务端:
public class NettyServer {
public static void main(String[] args) throws Exception {
// 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 创建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置启动参数
bootstrap.group(bossGroup, workGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 传入自定义的handler
sc.pipeline().addLast(new NettyServerHandler());
}
});
// 5. 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 6. 对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
}
// 数据读取完毕后
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是服务端", CharsetUtil.UTF_8));
}
// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2、客户端:
public class NettyClient {
public static void main(String[] args) throws Exception {
// 1. 创建事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 2. 创建启动对象
Bootstrap bootstrap = new Bootstrap();
// 3. 设置相关参数
bootstrap.group(eventLoopGroup) // 设置线程组
.channel(NioSocketChannel.class) // 设置通道
.handler(new ChannelInitializer<SocketChannel>() {
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new NettyClientHandler());
}
});
// 4. 连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
// 5. 监听通道关闭
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter{
// 通道就绪就被触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是客户端", CharsetUtil.UTF_8));
}
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到服务端消息:" + buf.toString(CharsetUtil.UTF_8));
}
// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
先启动服务端,然后启动客户端,就能看到服务端和客户端在控制台打印出来的消息了。
3、TaskQueue自定义任务: 上面服务端的NettyServerHandler的channelRead方法中,假如有一个非常耗时的业务,那么就会阻塞在那里,直到业务执行完。比如将NettyServerHandler的channelRead方法改成下面这样:
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 线程休眠10秒,模拟耗时业务
TimeUnit.SECONDS.sleep(10);
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
}
启动后会发现,10秒钟后,服务端才会收到客户端发送的消息,客户端也要10秒后,才会接收到服务端的消息,即服务端的channelReadComplete方法是要在channelRead方法执行完后才会执行的。
一直阻塞在那里也不是办法,希望可以异步执行,那我们就可以把该任务提交到该channel对应的NioEventLoop的TaskQueue中。有以下解决方案:
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
}
});
}
这里仍旧休眠10秒。启动服务端,再启动客户端,发现服务端要10s后才会打印出客户端发送的消息,但是客户端立刻就可以收到服务端在channelReadComplete方法里发送的消息,说明这次是异步的。
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
}
}, 5, TimeUnit.SECONDS);
}
启动服务端,然后启动客户端,发现客户端还是会立即收到服务端发出的消息,而服务端,首先要等待5秒才去执行run方法,run方法里面线程又休眠了10秒,所以总共要15秒后才会打印出客户端发送的消息。
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 传入自定义的handler
sc.pipeline().addLast(new NettyServerHandler());
// 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了
}
1、基本介绍:
// 判断当前操作是否完成
isDone
// 判断当前操作是否成功
isSuccess
// 获取操作失败的原因
getCause
// 判断当前操作是否被取消
isCancelled
// 注册监听器
addListener
2、使用监听器: 在NettyServer中的“启动并绑定端口”下面加上如下代码:
// 5. 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 注册监听器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
if (cf.isSuccess()) {
System.out.println("绑定端口成功");
} else {
System.out.println("绑定端口失败");
}
}
});
这样就注册了监听器,会监听绑定端口的结果,如果成功了,就会打印出绑定成功这段话。
开发一个Netty服务端,监听80端口,浏览器访问localhost,可以返回信息给浏览器。代码如下:
public class NettyHttpServer {
public static void main(String[] args) throws Exception {
// 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 创建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置启动参数
bootstrap.group(bossGroup, workGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
.childHandler(new NettyHttpServerInitializer());
// 5. 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(80).sync();
// 6. 对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {
// 向管道加入处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 1. 得到管道
ChannelPipeline pipeline = sc.pipeline();
// 2. 加入一个编码器解码器
pipeline.addLast("codec", new HttpServerCodec());
// 3. 增加一个自定义的handler处理器
pipeline.addLast("handler", new NettyHttpServerHandler());
}
}
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{
// 读取数据
@Override
protected void channelRead0(ChannelHandlerContext chc, HttpObject msg) throws Exception {
// 1. 判断msg是不是httpRequest请求
if (msg instanceof HttpRequest) {
System.out.println("msg类型:" + msg.getClass());
System.out.println("客户端地址:" + chc.channel().remoteAddress());
// 对特定资源不做响应
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了图标,不做响应");
return;
}
// 2. 创建回复给浏览器的信息
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器,are you ok?", CharsetUtil.UTF_8);
// 3. 构造http响应
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 4. 将response返回
chc.writeAndFlush(response);
}
}
}
启动server后,在浏览器就访问localhost就可以返回相关内容了。