首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO核心组件

NIO核心组件

作者头像
shysh95
发布2020-06-04 16:45:58
4820
发布2020-06-04 16:45:58
举报
文章被收录于专栏:shysh95shysh95

上节我们讲述了NIO以及IO多路复用的的基础知识,本来这节是要讲述Reactor模式,但是我在写Reactor模式的时候,发现关于NIO的核心组件没有讲述,为了避免读者一头雾水,我们临时插入一片关于NIO核心组件的文章。

首先我直接给出NIO的核心组件,主要有以下三个:

  • Channel
  • Buffer
  • Selector
Channel

Java NIO中的所有I/O操作都基于Channel对象,就像流操作都要基于Stream对象一样。那么Channel到底是什么?

Channel其实代表的是和某一类实体的连接,这个实体可以是文件或者网络套接字等。Channel是Java NIO提供的一座桥梁,用于我们的程序可以和操作系统底层IO服务进行交互。

Channel可以将数据读取到NIO核心组件Buffer中,也可以将Buffer中的数据写入到Channel。这一点区别于Stream,Stream往往是单向的,但Channel是多向的可读可写。

目前的Channel实现主要有以下几种:

  • FileChannel:读写文件
  • DatagramChannel: UDP协议网络通信
  • SocketChannel:TCP协议网络通信
  • ServerSocketChannel:监听TCP连接
Buffer

NIO中所使用的缓冲区不是一个简单的byte数组,而是封装过的Buffer类,通过它提供的API,我们可以灵活的操纵数据。Java NIO提供了很多具体的Buffer类实现,如下图所示:

从上图可以看出,Buffer的实现还是比较丰富的,包含了Java中的基本数据类型。除了从类别上分,还可以在在存储位置上分为以下两种:

  • DirectByteBuffer
  • HeapByteBuffer

HeapByteBuffer是在堆上进行申请的,当我们实际进行IO操作时,需要将堆内内存的数据拷贝到堆外内存(GC管理不到的地方),由此可见DirectByteBuffer的创建效率在使用是少了一次内存数据的拷贝,但是在堆内申请空间比堆外申请空间要慢,所以说irectByteBuffer的创建效率HeapByteBuffer要差的。

那么为什么在实际IO操作时必须要通过堆外内存呢?原因如下:

当我们把一个内存地址通过JNI传递给底层的C库时有一个最基本的要求,就是这个地址上的内容不能发生改变,但是Java由于有GC的存在,受GC管理的对象将会在内存中产生移动,也就是说我把一个地址传递给底层的write,但是这段内存由于GC管理却失效了,所以必须要把待发送的数据放在一个GC管理不到的地方。这也就是为什么在调用native方法之前,数据一定要在堆外内存的原因。

上面讲述了Buffer的分类,下面我们看一下Buffer的关键属性和方法。Buffer中有三个重要的属性:

  • capacity (总容量)
  • position (指针当前位置)
  • limit (读/写边界位置)

在对Buffer进行读/写的过程中,position会往后移动,而limit就是position移动的边界。由此不难想象,在对Buffer进行写入操作时,limit应当设置为capacity的大小,而对Buffer进行读取操作时,limit应当设置为数据的实际结束位置。

同时Buffer也有几个重要的方法,更多的方法在我们的使用中会一一碰到,下面我们说几个方法:

  • flip(): 设置limit为position的值,然后position置为0。对Buffer进行读取操作前调用。
  • rewind(): 仅仅将position置0。一般是在重新读取Buffer数据前调用,比如要读取同一个Buffer的数据写入多个通道时会用到。
  • clear(): 回到初始状态,即limit等于capacity,position置0。重新对Buffer进行写入操作前调用。
  • compact(): 将未读取完的数据(position与limit之间的数据)移动到缓冲区开头,并将position设置为这段数据末尾的下一个位置。然后后续写入数据不会覆盖这段数据。
Select

Select在NIO/IO多路复用的文章中已经讲解了为什么要使用它,是为了解决NIO中不断检查数据状态空耗CPU的问题。具体的操作很简单:

//创建一个Select
this.selector = Selector.open();
// 打开一个ServerSocketChannel
ServerSocketChannel channel = ServerSocketChannel.open();
// 设置为非阻塞模式
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(port));
// 将Channel注册到Selector上,并说明自己感兴趣的事件,如果对多个事件感兴趣,那么使用|进行连接
channel.register(selector, SelectionKey.OP_ACCEPT);
简易实现

为了避免读者对上述方法不熟悉,下面给出了一段演示代码,生产不可使用。

public class NioServer {

    private final Selector selector;

    public NioServer(int port) throws IOException {
        this.selector = Selector.open();
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(port));
        channel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    dispatch(iterator.next());
                    iterator.remove();
                }
            } catch (IOException e) {
                System.out.println("nio server error." + e.getMessage());
            }
        }
    }

    private void dispatch(SelectionKey key) throws IOException {
        try {
            if (key.isAcceptable()) {
                // 新连接建立
                register(key);
            } else if(key.isReadable()) {
                readData(key);
            } else if (key.isWritable()) {
                writerData(key);
            }
        } catch (Exception e) {
            System.out.println("nio server dispatch task error." + e.getMessage());
            key.cancel();
        }
    }

    private void writerData(SelectionKey key) {
    }

    private void readData(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        if (buffer.hasRemaining()) {
            System.out.println(new String(buffer.array()).trim());
        }
        buffer.clear();
        //写回数据
        channel.register(selector, SelectionKey.OP_WRITE);
        channel.write(buffer);
    }

    private void register(SelectionKey key) throws IOException {
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = channel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    public static void main(String[] args) throws IOException {
        NioServer nioServer = new NioServer(9090);
        nioServer.run();
    }
}


public class NioClient {

    public static void main(String[] args) {
        ExecutorService executors = Executors.newFixedThreadPool(500);
        for (int i = 0; i < 500; i++) {
            executors.submit(() -> {
                try {
                    SocketChannel socketChannel = SocketChannel.open();
                    socketChannel.connect(new InetSocketAddress("127.0.0.1", 9090));
                    ByteBuffer buffer = ByteBuffer.wrap(("hhh".getBytes()));
                    buffer.clear();
                    socketChannel.write(buffer);
                    ByteBuffer allocate = ByteBuffer.allocate(1024);
                    socketChannel.read(allocate);
                    System.out.println(new String(allocate.array()));
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
Reactor

在熟悉了Java NIO的核心组件后,我们下一节将会讲述Reacor模式。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员修炼笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Channel
  • Buffer
  • Select
  • 简易实现
  • Reactor
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档