首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

NIO快速入门案例

(1)概念原理

NIO(Non Blocking IO)又称同步非阻塞IO。服务器实现模式为把多个连接(请求)放入集合中,只用一个线程可以处理多个请求(连接),也就是多路复用。一个线程处理大量的客户端的请求,通过一个线程轮询大量channel,每次获取一批有事件的channel,然后对每个请求启动一个线程处理即可。

NIO API:网络SelectableChannel、SocketChannel、ServerSocketChannel、文件FileChannel

NIO 三大核心组件:

【Buffer缓冲区】

NIO的库中所有对象的读写都是用缓冲区处理的:读数据是从缓冲区读,写数据是写入缓冲区。buffer 底层就是个数组。

【Channel通道】

channel 类似于流,每个 channel 对应一个 buffer缓冲区。可以进行读写,与Stream的区别是:channel是全双工双向的,一个流只能是单向移动,channel既可以读也可以写。

【Selector多路复用器】

channel 会注册到 selector 上,由 selector 根据 channel 读写事件的发生将其交由某个空闲的线程处理、如果某个channel上有新的tcp连接接入读写事件,这个channel就处于就绪的状态,会被seclector轮询出来,然后通过SelectionKey可以获得就绪Channel的集合,进行后续的I/O操作。

Redis就是典型的基于epoll的NIO线程模型(nginx也是),epoll实例收集所有事件(连接与读写事件),把有数据交互的连接放入就绪事件列表中,由一个服务端线程连续处理所有就绪事件列表中的命令。

(2)NIOServer

public class NIOServer {

  public static void main(String[] args) throws IOException {

      //创建服务端socket通道 & 绑定host:port

      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(9999));

      //设置为非阻塞模式

      serverSocketChannel.configureBlocking(false);

      //新创建一个selector(其实可以为每一个channel单独创建一个selector)

      Selector selector = Selector.open();

      //将该通道注册到该selector上,并且注明感兴趣的事件,因为是服务端通道,所以只对accept事件感兴趣

      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

      while (true){

          //selector会帮我们去轮询,当前是否有我们感兴趣的事件发生,一直阻塞到有为止

          //select还有一个方法,可以指定阻塞时间,超过这个时间就会返回,此时可能返回的key个数为0

          selector.select();

          //若返回的key个数不为0,那么就可以一一处理这些事件

          Set<SelectionKey> selectionKeys = selector.selectedKeys();

          Iterator<SelectionKey> iterator = selectionKeys.iterator();

          while (iterator.hasNext()){

              SelectionKey selectionKey = iterator.next();

              //remove是为了下一次select的时候,重复处理这些已经处理过的事件

              //什么意思呢?其实selector.selectedKeys()返回来的set,就是其

              //内部操作的set,引用的是同一个set,所以我们如果不在外面remove已经

              //处理的事件,那么下一次,还会再次出现。需要注意的是,如果在外面对set

              //进行add操作,会抛异常,简单的说就是在外只删不增,在内只增不删。

              iterator.remove();

              if(selectionKey.isAcceptable()){

                  //SelectionKey.OP_ACCEPT事件

                  SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();

                  socketChannel.configureBlocking(false);

                  socketChannel.register(selector, SelectionKey.OP_READ);

              } else if(selectionKey.isReadable()){

                  //SelectionKey.OP_READ事件

                  //selectionKey.channel()返回的SelectableChannel是SocketChannel的父类所以可以直接强转

                  SocketChannel socketChannel = (SocketChannel)selectionKey.channel();

                  //NIO规定,必须要用Buffer进行读写

                  ByteBuffer buffer = ByteBuffer.allocate(32);

                  int len = socketChannel.read(buffer);

                  if(len == -1){

                      throw  new RuntimeException("连接已断开");

                  }

                  //上面那一步只是读到缓冲区,这里是从缓冲区真正的拿出数据

                  byte[] buf = new byte[len];

                  //read后,position的位置必定等于len,所以我们必须重置一下position为0,才可以从头开始读

                  //flip后,position=0, limit = len

                  buffer.flip();

                  buffer.get(buf);

                  //注册写事件

                  selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);

              } else if(selectionKey.isWritable()){

                  //SelectionKey.OP_WRITE事件

                  SocketChannel socketChannel = (SocketChannel)selectionKey.channel();

                  //写数据,也要用Buffer来写

                  int len = socketChannel.write(ByteBuffer.wrap("hi".getBytes()));

                  if(len == -1){

                      throw  new RuntimeException("连接已断开");

                  }

                  //这里为什么要取消写事件呢?因为只要底层的写缓冲区不满,就会一直收到这个事件所以只有想写数据的时候,才要注册这个写事件

                  selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);

              }

          }

      }

  }

}

(3)NIOClient

public class NIOClient {

  public static void main(String[] args) throws IOException, InterruptedException {

      //创建客户端socket通道 & 连接host:port

      SocketChannel socketChannel = SocketChannel.open();

      //设置为非阻塞模式

      socketChannel.configureBlocking(false);

      //非阻塞的形式连接服务器,如果直接使用open带参数的,连接的时候是阻塞连接

      socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));

      //新创建一个selector

      Selector selector = Selector.open();

      //将该通道注册到该selector上,并且注明感兴趣的事件

      socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

      while (true){

          selector.select();

          Set<SelectionKey> selectionKeys = selector.selectedKeys();

          Iterator<SelectionKey> iterator = selectionKeys.iterator();

          while (iterator.hasNext()){

              SelectionKey selectionKey = iterator.next();

              iterator.remove();

              //连接事件

              if(selectionKey.isConnectable()){

                  //看源码的注释可以知道,如果不使用带参数的open,那么需要手动调用这个方法完成连接

                  //如果是阻塞模式,该方法会阻塞到连接成功,非阻塞模式下,会立刻返回,已连接true,未连接false

                  if(socketChannel.finishConnect()){

                      //需要取消连接事件,否则会一直触发该事件,注册写事件

                      selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);

                  }

              } else if(selectionKey.isReadable()){

                  ByteBuffer buffer = ByteBuffer.allocate(32);

                  int len = socketChannel.read(buffer);

                  if(len == -1){

                      throw  new RuntimeException("连接已断开");

                  }

                  byte[] buf = new byte[len];

                  buffer.flip();

                  buffer.get(buf);

                  selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);

              } else if(selectionKey.isWritable()){

                  int len = socketChannel.write(ByteBuffer.wrap("hello".getBytes()));

                  if(len == -1){

                      throw  new RuntimeException("连接已断开");

                  }

                  selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);

                  //这个只是控制一下发送数据的速度

                  Thread.sleep(1000);

              }

          }

      }

  }

}

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OjoYkTISPDxhgogmA28YB_bA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券