前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈NIO

浅谈NIO

作者头像
曲水流觞
发布2019-10-27 21:27:37
5920
发布2019-10-27 21:27:37
举报
文章被收录于专栏:曲水流觞TechRill曲水流觞TechRill

浅谈NIO

说到NIO大家都不会陌生,它是JDK中提供的IO工具集。 它又被称作为New I/ONon Blocking I/O。相较于传统面向流的java.io,nio是完全面向缓冲的I/O,它提供了更底层的操作。

如果你了解C语言那么你一定接触过标准I/O库(stdio.h),其中实现的函数都是面向流的,而操作系统底层I/O函数(系统调用)都是基于缓冲去实现的(感兴趣可以参阅一些标准库的实现,例如glibc),标准I/O是在其上实现的高级API,它能对每个流的缓冲自动地进行管理,屏蔽掉了我们在管理缓冲上的复杂。实际上流也可以看作是一种很特殊的“缓冲”,比如将流看作一段段连续地缓冲块。

基于流的I/O是独立于操作系统的设计,它依赖操作系统底层的I/O模型,也带入了I/O阻塞的问题。值得一提的是它的出现比socket要早很多年,或者说在此之前I/O的阻塞可能都不构成一个问题。为什么这样说呢?我们先来讨论下什么是阻塞。

阻塞IO

我们都知道,CPU从寄存器中读取是最快的,其次CPU上的缓存,而读取磁盘相对来说是非常慢的。拿3.3GHz主频CPU为例,它的时钟周期为3ns,假设固态硬盘一次顺序读取需要50μs,那就是相当于16.6万个时钟周期,如果换算成秒的话CPU需要等待2天才有数据进入。

下图出自《Systems Performance》,表中列出一些事件的延迟时间。

所以调用一些直接与设备交互的函数(系统调用或者系统函数)CPU都会产生空闲,这种空闲会“阻塞”住进程。实际上大多数设备的操作,比如从硬盘上的文件系统中读取一个文本,除非是遇到硬件错误,它们都是能很快返回的。我们常常讨论的阻塞问题都是一些低速设备,例如网卡终端通道等,它们大多是一些被动的IO,如果不能从对端读取到数据很可能就一直阻塞下去。

阻塞类似于一个长时间的睡眠,在阻塞发生时进程处于一种假死状态,这时进程除了能被信号中断外CPU将不会继续往下执行指令。

考虑有以下服务器程序,serverSocket为服务器套接字实例,readMsg函数负责读取客户端套接字的逻辑。

代码语言:javascript
复制
while (true) {
   Socket socket = serverSocket.accpet();
   readMsg(socket);
}

我们可以使用telnet连接上这个服务,但如果我们什么都不输入,进程将被一直阻塞在readMsg函数。严重的是,我们的程序会成为了一对一的服务器程序。如果此时的连接不断开,其他用户试图连入时也会被阻塞住,这样的用户体验是非常糟糕的。

看到CPU会被读取函数阻塞住,可能有人就会想到现代CPU都是多核架构,我们可以使用其它核去建立一个新连接就解决了。确实,可以使用多线程去改进它,考虑到创建线程的开销我们会用到线程池。

代码语言:javascript
复制
while (true) {
   Socket socket = serverSocket.accpet();
   threadPool.submit(() -> readMsg(socket));
}

这样,我们就可以在新的线程中处理连接了,仅实现一个简单的web服务器也能有比较不错的性能。但是资源始终是有限的,如果处理请求的函数都是需要长时间等待的又或者根本就是恶意的连接,它们还是会占满所有的资源,后续连接依然会被阻塞。

在套接字(socket)的实现中,提供了相关的选项可以让发送端或接收端超时。它能让socket在超过指定时间没有收到响应就返回一个错误而不是一直阻塞。JDKSocket API也提供一个方法给套接字设置超时时间 - setSoTimeout(int),如果函数超出指定时间没有返回,那么将会抛出一个SocketTimeoutException,经过修改我们得到以下的加强版。

代码语言:javascript
复制
serverSocket.setSoTimeout(200);
while (true) {  
   Socket socket = null;
   try {
       socket = serverSocket.accpet();
   } catch (SocketTimeoutException e) {
       continue;
   }
   
   threadPool.submit(() -> {
       socket.setSoTimeout(50);
       readMsg(socket);
   });
}

通过设置超时时间得到一种非阻塞的假象,吞吐量得到稍微改善,但是依然没有本质上解决阻塞问题。

接下来我们来试试用NIO来解决阻塞带来的问题。

NIO

在JDK标准实现中,NIO提供了与传统IO完全不同的API来完成同样的事,它也提供了更多、更复杂的IO模型。NIO主要包括四大基本组件 - 通道(Channel)选择器(Selector)缓冲(Buffer)字符集(Charsets),本文会简单介绍前三个。

通道(Channel)

Nio为打开的IO设备提供了不同的抽象 - Channel,要用nio操纵一个设备需要一个Channel对象。

关于Channel的能力可以参阅官方文档中java.nio.channels包下的接口介绍。需要提一下的是ByteChannel,它实现了ReadableByteChannelWritableByteChannel,也就是说它同时具备读和写的能力,这是有别于的设计,因为大多数流得实现都只具备输入或输出中的一种能力。当然,从命名上已经能看出区别了,现实中的是单向的,而通道可以是双向的。在NIO中操纵套接字的SocketChannel也实现了ByteChannel,所以我们可以直接使用它读写套接字。

ServerSocketChannelSocketChannel这两个抽象类分别作为服务器套接字通道和客户端套接字通道的抽象,他们都继承了SelectableChannel,这关系着套接字通道的另外两项非常重要的能力 - 非阻塞I/O多路复用(multiplexing)。 我们先讨论下非阻塞I/O,它提供了一个configureBlocking(blooean)方法,它用于设置套接字操作是否阻塞。这意味着当在打开的ServerSocketSocket上设置非阻塞,之前会被阻塞的地方都能立即返回,例如,在ServerSocket使用accept,没有请求时会立即返回一个null

值得一提的是,ServerSocketChannelSocketChannel具体实现并没有包含在java.*包中而是在sun.nio.*,这部分的源码在Oracle提供的JDK中并没有公开。 感兴趣的同学可以去OpenJDK的源码中参照实现。

下面是我们使用NIO创建的一个服务端程序:

代码语言:javascript
复制
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
while (true) {
   SocketChannel socketChannel = null;
   while ((socketChannel = serverSocketChannel.accept()) == null) {
       // 没有请求进入时
   }
      
   socketChannel.configureBlocking(false);
   socketChannel.read(buff);
}

我这里是使用一个轮询(polling),这样的缺点是如果没有连接进入时,CPU就会不停的执行指令,通过top等工具我们也能很清楚的看到这时候CPU的负荷很高。由于大多数的服务器程序都不是计算密集型的,我们需要适当的让CPU空闲而不是一直去执行无意义的指令。这种模型是不适合直接在实际应用中使用的,一般它都会和多路复用模型搭配使用。

多路复用(Selector)

其实操作系统设计者们早就考虑到阻塞式I/O带的一些问题,所以在很早的时候操作系统中就有一种multiplexing的I/O模型。早在1983年发布的BSD4.2中就引入了系统调用select,值得一提的是这个版本还首次引入了socket套接字API,很难不让人联想它的存在就是为了解决套接字这类“低速”设备上的阻塞I/O问题的。

在Unix标准实现中提供的selectpoll系统调用都可以说否是多路复用的实现,它们提供了让内核通知进程I/O已准备就绪的能力,这样我们就能串行操纵多个打开I/O设备

比较有意思的是某些Unix操作系统中,比如LinuxBSD中都包含一些非标准的实现,它们具备更优的性能,感兴趣的同学可以参照epoll_create(2)kqueue(2)

在Jdk的实现中,java.nio提供一个名为Selector的抽象类,从名字可以看出它是具备类似于select系统调用的功能。与SocketChannel同样它的实现也在sun.nio.*包中,它会根据操作系统提供不同的实现,例如,在linux会使用epollBSD中会使用kqueue,从而提供更好的性能。

我们可以直接使用它提供的静态工厂方法即可创建selector:

代码语言:javascript
复制
Selector selector = Selector.open();

既然是要操作系统内核去通知进程,那自然需要有对应事件的处理。做过awt或者web的都应该清楚,如果要处理某一种事件(例如,点击一个表单上的按钮),就需要注册对应事件到事件监听器上。Selector也同样,我们需要对创建的套接字注册监听事件。

前文提到SocketChannel所继承的SelectableChannel是为套接字channel提供多路复用能力。

通过文档我们在SelectableChannel中找到一个SelectionKey register(Selector sel, int ops)方法。注意,这里要注册一个事件到Selector是使用SelectableChannel的能力。

  • 返回值SelectionKey代表着一个已经被注册到Selector中的SelectableChannel实例。
  • sel自然是需要我们传入上面创建的selector
  • ops代表需要监听的事件, 它们被定义在中SelectionKey中,例如需要监听accept事件只需要调用register(selector, SelectionKey.OP_ACCEPT),也可以传入多个事件,需要用或运算符|串起来 SelectionKey.OP_READ | SelectionKey.OP_WRITE,这是*nix系统函数的惯用传参法。

接下来使用Selector中的int select()方法,它将返回到来事件的个数。这个方法将会一直阻塞到有事件发生,所以一般会使用另一个带long参数的版本int select(long timeout)。它接收一个超时时间,当阻塞超时方法就会立即返回0

下面来演示一个多路复用服务端的实现:

代码语言:javascript
复制
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true ) {
   if (selector.select(TIME_OUT) == 0) {
       //...
       continue;
   }
   //...
}

当有事件到来时,例如有客户端连入,select()方法的返回事件个数使得程序跳出if分支。

这时候调用Selector.selectedKeys()会返回一个SelectionKey集合,代表着需要处理的事件。

SelectionKey中定义了一系列boolean is*()方法对了应事件类型,这样我们就可以根据事件类型定义不同的操作

代码语言:javascript
复制
while (true) {
   ...
   Set<SelectionKey> set = selector.selectedKeys();  
    
   set.forEach((selectionKey) -> {
       if (selectionKey.isAcceptable()) {
           try {
               SocketChannel socketChannel = serverSocketChannel.accept();
               socketChannel.configureBlocking(false);
               socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(16));
           } catch (IOException e) {
               //
           }
       }       
       if (selectionKey.isReadable()) {
           SocketChannel socketChannel = selectionKey.channel();
           ByteBuffer buf = (ByteBuffer) selectionKey.attachment();
           readMsg(socketChannel, buff);
       }
       ...
   });
   
   // 使用完毕需要清空
   set.clear();
}

在处理accept事件时我们可以把建立的客户端套接字也注册到Selector上,它使用SelectionKey.selector()获取与之关联的Selector。这里使用了之前register(Selector sel, int ops)的一个重载版本, 它在原本基础上还需要额外的第三个参数 - 一个Object数据,代表一个与SelectorKey相关联的附件。这样我们在下次需要处理读取写入事件时直接取出它即可,节省了每次创建缓冲的开销。

使用多路复用我们就可以在串行中处理很多个连接,当然这也不是没有上限,这个取决于操作系统中文件描述符的上限,虽然说这个值也是可以修改的-_-

缓冲(Buffer)

对于操作系统来说,I/O是一个非常昂贵的操作,比如在磁盘中读取,相较于随机访问内存(RAM)磁盘的访问是非常缓慢的过程。所以,操作系统的设计中有很多机制去优化读和写,例如,在读取一个文件时会事先将部分读入内存中,这个步骤一般由内核来完成的。内核的代码运行在内核空间,通常内核在读取数据时会事先将数据读入在内核空间的缓冲区中。

我们知道用户所编写的代码是运行在用户空间,是不能直接访问内核空间的,内核提供一种名为系统调用(syscall)的接口去完成内核空间的访问。

内核空间的执行代码的代价也是相当昂贵的,因为稍有不慎就可能导致系统崩溃,所以内核中在系统调用的实现中有很多的检查机制。大量使用系统调用会对我们的程序性能大打折扣,所以我们会考虑减少这类函数的访问。比如在I/O系统调用中使用一个内存缓冲区比如数组,这样可以避免每读取一个字节都调用一次系统函数,所以用户缓冲区的大小关联着程序的性能,当然性能也不是线性增长。

下表来自《The Linux Programming Interface》,作者对100m文本读取时使用不同大小缓冲区的测试结果:

BUF_SIZE

Elapsed

Total CPU

User CPU

System CPU

1

107.43

107.32

8.20

99.12

2

54.16

53.89

4.13

49.76

4

31.72

30.96

2.30

28.66

8

15.59

14.34

1.08

13.26

16

7.50

7.14

0.51

6.63

32

3.76

3.68

0.26

3.41

64

2.19

2.04

0.13

1.91

128

2.16

1.59

0.11

1.48

256

2.06

1.75

0.10

1.65

512

2.06

1.03

0.05

0.98

1024

2.05

0.65

0.02

0.63

4096

2.05

0.38

0.01

0.38

16384

2.05

0.34

0.00

0.33

65536

2.06

0.32

0.00

0.32

这也就是为什么很多编程语言中的IO处理函数都需要提供一个数组结构,IO本质上也就是用户空间内核空间的拷贝。扯远了,读取Channel中的数据也是需要提供一个缓冲区的。NIO中的缓冲是一个Buffer对象,简单点理解它底层就是一个数组,提供比数组更多的方式去管理和操作数组中的元素,但是它的实现也不是完全依赖于数组。

Buffer是一个抽象类,它有几种基本数据类型的实现,例如,HeapByteBufferHeapIntBuffer,它们的底层也维护着一个与之对应的数组结构,注意这个Heap所表达的含义,它对应了我们使用new运算符在Heap中所创建的数组。Buffer一般通过静态方法allocate去创建,当然创建方法还有可以一个数组入参的wrap(array)方法。

代码语言:javascript
复制
ByteBuffer byteBuff1 = ByteBuffer.allocate(1024);
ByteBuffer byteBuff2 = ByteBuffer.wrap(new byte[1024]);

在Java中数组是不能访问索引超过数组大小的元素,如果超过则抛出索引越界异常。Buffer提供get(ini)/put(int, T)方法用于获取或放置指定位置的数据。既然底层是数组,那么Buffer就有一个最大容量,它和底层数组的大小等价,是一个不会改变的值。在Buffer中有一个capacity属性代表着容量大小,它通过Buffer.capacity()方法直接获取。

Buffer还有一个核心概念limit,它的值可以通过Buffer.limit()方法直接获取,官方为其定义为第一个不可读/写的元素。也就是说从limit开始到capacity这段空间是不会被读取或写入的,用户只能访问索引从起始位置0limit - 1中的元素,使用大于(或等于)limit位置上的元素会抛出索引越界异常。limit的大小可以通过Buffer.limit(int)进行修改但不能超过capacitylimit初始值和capacity相等。

代码语言:javascript
复制
byte[] array = new byte[1024];
ByteBuffer byteBuff = ByteBuffer.wrap(array);
array.length == byteBuff.capacity(); // true
byteBuff.capacity() == byteBuff.limit(); // true
byteBuff.get(1023);byteBuff.limit(1023); // limit = 1023
byteBuff.get(1023); // java.lang.IndexOutOfBoundsException

在linux内核中维护着一个打开文件表,一个打开文件对应着表中的一条记录,其中维护打开文件的偏移量状态等信息。一些系统调用可以改变这些信息。比如,在read一个文件描述符(file descriptor)时会隐式将偏移量作调整,下次读取时就会从该位置开始操作。

下图为文件描述符表、打开文件表、inode表之间的关系:

Buffer中也有这么一个类似偏移量的概念叫做position,它的值可以通过Buffer.position()方法直接获取,官方对其的定义是下一个要读或写的索引值。每次读取都会改变position的值,但是无论如何都不会超过limit,也就是说当position抵达limit时就无法用这个Buffer实例读入或写出数据。可以通过Buffer.flip()方法将limit设置为当前position位置并将position初始化为0,此时就可以使用这个Buffer去完成操作,这个过程也叫做读写切换。

代码语言:javascript
复制
while (fileChannel.read(byteBuff) > 0) {
   int p = byteBuff.position();
   byteBuff.flip(); // limit = position; position = 0;
   p == byteBuff.limit(); // true
   process(byteBuff);
}

还有一个和position相关的核心概念mark,在读取数据的过程中可以通过Buffer.mark()mark的值置为positionmark的值没有办法直接获取,但可以通过使用reset()position重置为mark的值,reset()方法不能在mark()之前调用。

以上就是Buffer中的几个核心概念,它们的之间的关系为:

0⩽mark⩽position⩽limit

⩽capacity

多了Buffer这一层,我们就不用关心它们底层缓冲区具体是什么,通过实现一套API就能完成基本的I/O操作。比如,通过DirectByteBuffer.allocateDirect()创建的DirectByteBuffer,它使用了堆外内存实现Buffer。又或者用FileChannel.map()方法创建的MappedByteBuffer,它将文件直接映射到内存中,是使用内存映射实现的Buffer,也是零拷贝的一种实现。

Buffer有很多高级用法就不一一叙述了,毕竟本文也不是介绍API的文章。

推荐读物

写到最后,我要推荐(安利)几本个人觉得非常不错的读物:

首先当然是Steven大神的 《UNIX网络编程 卷1 + 卷2》,这是网络编程必读书目之一。

系统编程首先推荐 《Linux/Unix系统编程手册》,系统编程专家级读物。当然apue也非常不错,不过感觉这本可以与apue互补。

Java 网络编程 《Java TCP/IP Socket编程》,关于Java Socket API 和 NIO API做了很全面的介绍,虽然内容有点老。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 曲水流觞TechRill 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 浅谈NIO
  • 阻塞IO
  • NIO
    • 通道(Channel)
      • 多路复用(Selector)
        • 缓冲(Buffer)
        • 推荐读物
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档