(1)概念原理
NIO(Non Blocking IO)又称同步非阻塞IO。服务器实现模式为把多个连接(请求)放入集合中,只用一个线程可以处理多个请求(连接),也就是多路复用。一个线程处理大量的客户端的请求,通过一个线程轮询大量channel,每次获取一批有事件的channel,然后对每个请求启动一个线程处理即可。
NIO API:网络SelectableChannel、SocketChannel、ServerSocketChannel、文件FileChannel
NIO 三大核心组件:
【Buffer缓冲区】
NIO的库中所有对象的读写都是用缓冲区处理的:读数据是从缓冲区读,写数据是写入缓冲区。buffer 底层就是个数组。
【Channel通道】
channel 类似于流,每个 channel 对应一个 buffer缓冲区。可以进行读写,与Stream的区别是:channel是全双工双向的,一个流只能是单向移动,channel既可以读也可以写。
【Selector多路复用器】
channel 会注册到 selector 上,由 selector 根据 channel 读写事件的发生将其交由某个空闲的线程处理、如果某个channel上有新的tcp连接接入读写事件,这个channel就处于就绪的状态,会被seclector轮询出来,然后通过SelectionKey可以获得就绪Channel的集合,进行后续的I/O操作。
Redis就是典型的基于epoll的NIO线程模型(nginx也是),epoll实例收集所有事件(连接与读写事件),把有数据交互的连接放入就绪事件列表中,由一个服务端线程连续处理所有就绪事件列表中的命令。
(2)NIOServer
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建服务端socket通道 & 绑定host:port
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(9999));
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//新创建一个selector(其实可以为每一个channel单独创建一个selector)
Selector selector = Selector.open();
//将该通道注册到该selector上,并且注明感兴趣的事件,因为是服务端通道,所以只对accept事件感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
//selector会帮我们去轮询,当前是否有我们感兴趣的事件发生,一直阻塞到有为止
//select还有一个方法,可以指定阻塞时间,超过这个时间就会返回,此时可能返回的key个数为0
selector.select();
//若返回的key个数不为0,那么就可以一一处理这些事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
//remove是为了下一次select的时候,重复处理这些已经处理过的事件
//什么意思呢?其实selector.selectedKeys()返回来的set,就是其
//内部操作的set,引用的是同一个set,所以我们如果不在外面remove已经
//处理的事件,那么下一次,还会再次出现。需要注意的是,如果在外面对set
//进行add操作,会抛异常,简单的说就是在外只删不增,在内只增不删。
iterator.remove();
if(selectionKey.isAcceptable()){
//SelectionKey.OP_ACCEPT事件
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if(selectionKey.isReadable()){
//SelectionKey.OP_READ事件
//selectionKey.channel()返回的SelectableChannel是SocketChannel的父类所以可以直接强转
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//NIO规定,必须要用Buffer进行读写
ByteBuffer buffer = ByteBuffer.allocate(32);
int len = socketChannel.read(buffer);
if(len == -1){
throw new RuntimeException("连接已断开");
}
//上面那一步只是读到缓冲区,这里是从缓冲区真正的拿出数据
byte[] buf = new byte[len];
//read后,position的位置必定等于len,所以我们必须重置一下position为0,才可以从头开始读
//flip后,position=0, limit = len
buffer.flip();
buffer.get(buf);
//注册写事件
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
} else if(selectionKey.isWritable()){
//SelectionKey.OP_WRITE事件
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//写数据,也要用Buffer来写
int len = socketChannel.write(ByteBuffer.wrap("hi".getBytes()));
if(len == -1){
throw new RuntimeException("连接已断开");
}
//这里为什么要取消写事件呢?因为只要底层的写缓冲区不满,就会一直收到这个事件所以只有想写数据的时候,才要注册这个写事件
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
}
}
}
}
(3)NIOClient
public class NIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
//创建客户端socket通道 & 连接host:port
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//非阻塞的形式连接服务器,如果直接使用open带参数的,连接的时候是阻塞连接
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//新创建一个selector
Selector selector = Selector.open();
//将该通道注册到该selector上,并且注明感兴趣的事件
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
//连接事件
if(selectionKey.isConnectable()){
//看源码的注释可以知道,如果不使用带参数的open,那么需要手动调用这个方法完成连接
//如果是阻塞模式,该方法会阻塞到连接成功,非阻塞模式下,会立刻返回,已连接true,未连接false
if(socketChannel.finishConnect()){
//需要取消连接事件,否则会一直触发该事件,注册写事件
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
}
} else if(selectionKey.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(32);
int len = socketChannel.read(buffer);
if(len == -1){
throw new RuntimeException("连接已断开");
}
byte[] buf = new byte[len];
buffer.flip();
buffer.get(buf);
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
} else if(selectionKey.isWritable()){
int len = socketChannel.write(ByteBuffer.wrap("hello".getBytes()));
if(len == -1){
throw new RuntimeException("连接已断开");
}
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
//这个只是控制一下发送数据的速度
Thread.sleep(1000);
}
}
}
}
}
领取专属 10元无门槛券
私享最新 技术干货