NIO (New lO)也有人称之为java non-blocking lO是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java lO API。
NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区
的、基于通道的IO操作。NIO将以更加高效
的方式进行文件的读写操作
。
NIO可以理解为非阻塞IO
,传统的IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事情,比如调用socket.read()时,如果服务器一直没有数据传输过来,线程就一直阻塞,而NIO中可以配置socket为非阻塞模式。
NIO相关类都被放在java.nio
包及子包下,并且对原java.io
包中的很多类进行改写。
NIO有三大核心部分: Buffer(缓冲区),Channel(通道),Selector(选择器)。
NIO Buffer对象
,并提供了一组方法,用来方便的访问该块内存与NIO通道进行交互
,数据是从通道读入缓冲区,从缓冲区写入通道中ByteBuffer最常用,ByteBuffer三个子类的类图如下
JVM堆
中分配数组
用来存放 Buffer 中的数据public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{
//在堆中使用一个数组存放Buffer数据
final byte[] hb;
...
}
allocate()
方法进行分配,在jvm堆上申请堆上内存ByteBuffer heapByteBuffer = ByteBuffer.allocate(1024);
堆外内存(操作系统内存)
中分配,jvm内存只保留堆外内存地址
public abstract class Buffer {
//堆外内存地址
long address;
...
}
allocateDirect()
方法进行分配,直接从系统内存中申请ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(1024);
ByteBuffer 有以下重要属性
不能更改
可以操作数据的大小
(limit 后数据不能进行读写) 下一个
要读取或写入的数据的索引 ByteBuffer写入和读取原理
@Test
public void simpleTest() {
// 1. 分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(10);
// 2. 利用put()存入数据到缓冲区中
buf.put("data".getBytes());
// 3. 切换读取数据模式
buf.flip();
// 判断缓冲区中是否还有元素
while (buf.hasRemaining()) {
// 4. 利用 get()读取单个字节
byte b = buf.get();
System.out.println("实际字节 " + (char) b);
}
// 清空缓冲区
buf.clear();
}
输出结果:
实际字节 d
实际字节 a
实际字节 t
实际字节 a
特别说明:compact方法,是把未读完的部分向前压缩,然后切换至写模式
位置相关
int capacity()
:返回 Buffer 的 capacity 大小int limit()
:返回 Buffer 的界限(limit) 的位置int position()
:返回缓冲区的当前位置 position@Test
public void test1() {
// 分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println(buf.position());// 0: 表示当前的位置为0
System.out.println(buf.limit());// 1024: 表示界限为1024,前1024个位置是允许我们读写的
System.out.println(buf.capacity());// 1024:表示容量大小为1024
System.out.println(buf.remaining());// 1024:表示position和limit之间元素个数
}
读写相关
put(byte b)
:将给定单个字节写入缓冲区的当前位置put(byte[] src)
:将 src 中的字节写入缓冲区的当前位置不会移动 position
)get()
:读取单个字节get(byte[] dst)
:批量读取多个字节到 dst 中不会移动 position
)@Test
public void test2() {
ByteBuffer buf = ByteBuffer.allocate(10);
// 默认写模式,写入数据
buf.put("abcde".getBytes());
System.out.println(buf.position());// 5: 当前位置5,表示下一个写入的位置是5
System.out.println(buf.limit());// 10: 表示界限为10,前10个位置是允许写入的
// 切换为读模式
buf.flip();
System.out.println(buf.position());// 0: 从0位置开始读取数据
System.out.println(buf.limit());// 5: 表示界限为5,前5个位置是允许读取的
// 读取两个字节
byte[] dst = new byte[2];
buf.get(dst);
System.out.println(new String(dst, 0, 2)); // 输出:ab
System.out.println(buf.position());// 2: 从2位置开始读取数据,因为0,1已经读取
System.out.println(buf.limit());// 5: 表示界限为5,前5个位置是允许读取的
// 根据索引读取,position不会移动
byte b = buf.get(3);
System.out.println((char) b); // 输出:d
System.out.println(buf.position());// 2: 依然是2,没有移动
}
切换模式相关
Buffer flip()
:将缓冲区的界限设置为当前位置, 并将当前位置重置为0(切换为读模式)Buffer clear()
:清空缓冲区(切换为写模式)Buffer compact()
:向前压缩未读取部分(切换为写模式)@Test
public void test3() {
ByteBuffer buf = ByteBuffer.allocate(10);
// 默认写模式,写入数据
buf.put("hello".getBytes());
// 切换为读模式
buf.flip();
// 读取两个字节
byte[] dst = new byte[2];
buf.get(dst);
System.out.println(buf.position());// 2: 当前位置2,前两个位置已经读取,读取下一个位置是2
System.out.println(buf.limit());// 5: 表示界限为5,前5个位置是允许读取的
// 向前压缩未读取,并切换为写模式
buf.compact();
System.out.println(buf.position());// 3: 当前位置3,因为之前有两个位置没有被读取,放到了最前面,写入的下一个位置是3
System.out.println(buf.limit());// 10: 表示界限为10,前10个位置是允许写入的
}
修改Buffer相关
标记相关
@Test
public void test4() {
ByteBuffer buf = ByteBuffer.allocate(10);
// 默认写模式,写入数据
buf.put("hello".getBytes());
// 切换为读模式
buf.flip();
// 读取两个字节
System.out.println((char) buf.get());
buf.mark();
System.out.println((char) buf.get());
System.out.println((char) buf.get());
buf.reset();
System.out.println((char) buf.get());
System.out.println((char) buf.get());
System.out.println((char) buf.get());
System.out.println((char) buf.get());
// hello读完再读,抛异常java.nio.BufferUnderflowException
// System.out.println((char) buf.get());
}
输出:
h
e
l
e
l
l
o
总结Buffer读写数据四个步骤
public class TestByteBufferString {
public static void main(String[] args) {
// 字符串转为ByteBuffer
// 方式一:put
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
// 方式二:Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
// 方式三:wrap
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
// ByteBuffer转为字符串
// 方式一:Charset
String str1 = StandardCharsets.UTF_8.decode(buffer1).toString();
// 方式二:String
String str2 = new String(buffer2.array(), 0, buffer2.limit());
}
}
传统流是单向
的,只能读或者写,而NIO中的Channel(通道)是双向
的,可以读操作,也可以写操作。
阻塞
模式下获取FileChannel
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有getChannel
方法。
FileInputStream
获取的 channel 只能读FileOutputStream
获取的 channel 只能写RandomAccessFile
是否能读写根据构造RandomAccessFile时的读写模式决定// 只能读
FileChannel channel1 = new FileInputStream("hello.txt").getChannel();
// 只能写
FileChannel channel2 = new FileOutputStream("hello.txt").getChannel();
// 以只读方式打开指定文件
FileChannel channel3 = new RandomAccessFile("hello.txt", "r").getChannel();
// 以读、写方式打开指定文件。如果该文件尚不存在,则尝试创建该文件
FileChannel channel4 = new RandomAccessFile("hello.txt", "rw").getChannel();
读取数据
int read(ByteBuffer dst)
:从Channel到中读取数据到ByteBuffer,返回值表示读到的字节数量
,-1
表示到达了文件的末尾
@Test
public void testRead() throws IOException {
// 获取只读文件通道
FileChannel channel = new RandomAccessFile("hello.txt", "r").getChannel();
// 创建字节缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 循环读取通道中的数据,并写入到 buf 中
while (channel.read(buf) != -1) {
// 缓存区切换到读模式
buf.flip();
// 读取 buf 中的数据
while (buf.position() < buf.limit()) {
// 将buf中的数据追加到文件中
System.out.println((char) buf.get());
}
// 清空已经读取完成的 buffer,以便后续使用
buf.clear();
}
// 关闭通道
channel.close();
}
写入数据
int write(ByteBuffer src)
:将ByteBuffer中的数据写入到Channel@Test
public void testRead() throws IOException {
// 获取写文件通道
FileChannel channel = new FileOutputStream("hello.txt").getChannel();
// 将ByteBuffer数据写到通道
channel.write(ByteBuffer.wrap("abc".getBytes()));
// 强制将数据刷出到物理磁盘
channel.force(false);
// 关闭通道
channel.close();
}
其他
long position()
:返回此通道的文件位置long size()
:返回此通道的文件的当前大小void force(boolean metaData)
:强制将所有对此通道的文件更新写入到存储设备中@Test
public void testOther() throws IOException {
// 获取写文件通道
FileChannel channel = new FileOutputStream("hello.txt").getChannel();
System.out.println(channel.position());// 0:当前位置为0,表示下次写入的位置为0
System.out.println(channel.size());// 0:文件大小为0
// 写入3个字符到 hello.txt 文件中
channel.write(ByteBuffer.wrap(("abc").getBytes()));
System.out.println(channel.position());// 3:当前位置为3,表示下次写入的位置为3
System.out.println(channel.size());// 3:文件大小为3,因为写入3个字符
channel.position(5);// 设置当前位置为5,表示下次写入的位置为5
// 再写入123,此时会跳过索引3和4,写入索引5
channel.write(ByteBuffer.wrap(("123").getBytes()));
// 将数据刷出到物理磁盘
channel.force(false);
// 关闭通道
channel.close();
}
输出结果:索引3和4的位置为空,这是应该特殊字符吧
/**
* 方法一(目标文件调用者)
*/
@Test
public void transferFrom() throws Exception {
// 1、字节输入管道
FileInputStream is = new FileInputStream("hello.txt"); // 源文件输入流
FileChannel fromChannel = is.getChannel();
// 2、字节输出流管道
FileOutputStream fos = new FileOutputStream("hello的副本.txt"); // 目标文件输出流
FileChannel toChannel = fos.getChannel();
// 3、复制
toChannel.transferFrom(fromChannel, fromChannel.position(), fromChannel.size());
fromChannel.close();
toChannel.close();
}
/**
* 方法二(资源文件调用者)
*/
@Test
public void transferTo() throws Exception {
// 1、字节输入管道
FileInputStream is = new FileInputStream("hello.txt"); // 源文件输入流
FileChannel fromChannel = is.getChannel();
// 2、字节输出流管道
FileOutputStream fos = new FileOutputStream("hello的副本.txt"); // 目标文件输出流
FileChannel toChannel = fos.getChannel();
// 3、复制
fromChannel.transferTo(fromChannel.position(), fromChannel.size(), toChannel);
fromChannel.close();
toChannel.close();
}
2g
大小的文件传输(因为超过2g,多出的部分会丢失)FileUtils.copyFile(final File srcFile, final File destFile)
方法的内部实现)@Test
public void transferFromBig() throws IOException {
// 使用try-with-resources语句确保流在使用完毕后被正确关闭
try (FileInputStream fis = new FileInputStream("hello.txt"); // 源文件输入流
FileChannel input = fis.getChannel(); // 获取源文件的文件通道
FileOutputStream fos = new FileOutputStream("hello的副本.txt"); // 目标文件输出流
FileChannel output = fos.getChannel()) { // 获取目标文件的文件通道
final long size = input.size(); // 获取源文件的大小
long pos = 0;
long count;
// 循环读取源文件内容,直到全部复制完毕
while (pos < size) {
// 计算剩余待复制的字节数
final long remain = size - pos;
// 根据剩余字节数决定本次要复制的字节数,最多30MB
count = remain > 1024 * 1024 * 30 ? 1024 * 1024 * 30 : remain;
// 从源文件通道复制数据到目标文件通道
final long bytesCopied = output.transferFrom(input, pos, count);
if (bytesCopied == 0) {
// 如果没有复制任何数据,跳出循环
break;
}
// 更新已复制的字节位置
pos += bytesCopied;
}
}
}
连接数过多
,必然导致OOM
,并且线程太多,反而会因为频繁上下文切换
导致性能降低不适合长连接,只适合短连接
服务端
accept()
方法,等待客户端的连接read()
方法,等待客户端的发送@Test
public void server() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建一个ServerSocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = serverSocketChannel.accept(); // 阻塞方法,线程停止运行
log.debug("connected... {}", sc);
channels.add(sc);
// 遍历连接集合
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,线程停止运行,等待客户端发消息读取
buffer.flip(); // 转为读模式
// 打印出响应信息
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear(); // 清空缓冲区,转为写模式
log.debug("after read...{}", channel);
}
}
}
客户端
@Test
public void client() throws IOException {
// 创建一个SocketChannel通道,并连接到本地的8080端口
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
socketChannel.write(ByteBuffer.wrap("a".getBytes()));
System.in.read();
}
null
,继续运行0
,但线程不会阻塞服务端
ServerSocketChannel和SocketChannel.configureBlocking(false)
即为非阻塞模式@Test
public void server() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
// 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
log.debug("connected... {}", socketChannel);
socketChannel.configureBlocking(false); // 非阻塞模式
channels.add(socketChannel);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
// 打印出响应信息
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
}
一个线程
,处理多个的客户端
连接,就会使用到Selector(选择器)没有事件发生,则处于阻塞状态
,防止cpu浪费//1. 获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 切换非阻塞模式
serverSocketChannel.configureBlocking(false);
//3. 绑定连接
serverSocketChannel.bind(new InetSocketAddress(9898));
//4. 获取选择器
Selector selector = Selector.open();
//5. 将通道注册到选择器上, 并且指定“监听接收事件”
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
监听的事件类型(SelectionKey四个int常量)
读
:SelectionKey.OP_READ (1)写
:SelectionKey.OP_WRITE (4)连接
:SelectionKey.OP_CONNECT (8)接收
:SelectionKey.OP_ACCEPT (16)若注册时不止监听一个事件,则可以使用“位或”
操作符连接
// 监听读和写事件
serverSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
多路复用
服务端
一个
服务端通道ServerSocketChannel和多个
SocketChannel客户端通道注册到selector上@Test
public void server() throws IOException {
// 1.获取管道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.设置非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3.绑定端口
serverSocketChannel.bind(new InetSocketAddress(8888));
// 4.获取选择器
Selector selector = Selector.open();
// 5.将通道注册到选择器上,并且开始指定监听的接收事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6.轮询已经就绪的事件
// select方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行,返回事件数量
// 事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则即使item.remove(),selector.select()还是会获取到没处理的事件
while (selector.select() > 0) {
System.out.println("开启事件处理");
// 7.获取选择器中所有注册的通道中已准备好的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// 8.开始遍历事件
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
System.out.println("客户端通道事件对象key:" + selectionKey);
// 9.判断这个事件具体是啥
if (selectionKey.isAcceptable()) { // 客户端接入事件
// 10.获取当前接入事件的客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 11.切换成非阻塞模式
socketChannel.configureBlocking(false);
// 12.将本客户端注册到选择器
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 读事件
// 13.获取当前选择器上的读通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 14.读取
ByteBuffer buffer = ByteBuffer.allocate(1024);
/*
* read()方法的三种返回值
* 返回值大于0:读到了直接,对字节进行编解码
* 返回值等于0:没有读到字节,属于正常场景,忽略
* 返回值为-1:链路已经关闭,需要关闭SocketChannel释放资源
*/
int len = socketChannel.read(buffer);
if (len > 0) {
buffer.flip(); // 转为读模式
System.out.println(new String(buffer.array(), 0, len));
buffer.clear(); // 清空缓冲区,转为写模式
} else if(len < 0) {
// 如果读不到数据,取消事件
// 否则客户端断开时,len=-1,数据没有读取到也就是没有处理,会一直循环调用此读事件内容
selectionKey.cancel();
socketChannel.close();
}
}
// 15.处理完毕后,移除当前事件
it.remove();
}
}
}
客户端
public static void main(String[] args) throws Exception {
// 1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));
// 2、切换成非阻塞模式
sChannel.configureBlocking(false);
// 3、分配指定缓冲区大小
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4、发送数据给服务端
Scanner sc = new Scanner(System.in);
while (true) {
System.out.println("请说:");
String msg = sc.nextLine();
buf.put((msg).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
}
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
内部工作流程是这样的:
用户态
切换至内核态
,去调用操作系统的读能力,将数据读入内核缓冲区
。这期间用户线程阻塞,操作系统使用 DMA(可以理解为硬件单元)来实现文件读,其间也不会使用 cpu内核态
切换回用户态
,将数据从内核缓冲区
读入用户缓冲区
(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA用户缓冲区
(byte[] buf)写入socket 缓冲区
,cpu 会参与拷贝用户态
切换至内核态
,调用操作系统的写能力,使用 DMA 将socket 缓冲区
的数据写入网卡,不会使用 cpujava 的 IO 实际不是物理设备级别的读写,而是缓存的
复制
,底层的真正读写是操作系统
来完成的
3
次,这个操作比较重量级4
次直接访问使用
此内存的虚引用
用户态与内核态的切换次数与数据拷贝次数
3
次3
次sendFile
方法),java 中对应着两个channel调用 transferTo/transferFrom
方法拷贝数据用户态
切换至内核态
,使用 DMA将数据读入内核缓冲区
,不会使用 cpu内核缓冲区
传输到 socket 缓冲区
,cpu 会参与拷贝socket 缓冲区
的数据写入网卡,不会使用 cpu用户态与内核态的切换次数与数据拷贝次数
1
次3
次用户态
切换至内核态
,使用 DMA将数据读入内核缓冲区
,不会使用 cpusocket 缓冲区
,几乎无消耗内核缓冲区
的数据写入网卡,不会使用 cpu用户态与内核态的切换次数与数据拷贝次数
1
次2
次整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中。零拷贝适合小文件传输。