Java 的 NIO 是如何工作的?

在这个数据爆炸的时代,有大量的数据在系统中流动,一个应用系统的瓶颈往往都是 IO 瓶颈。传统的 javaIO 模型是 BIO,也就是同步阻塞 IO,数据在写入 OutputStream 或者从 InputStream 读取时,如果没有数据没有读到或写完,线程都会被阻塞,处于等待状态,直到数据读取完成或写入完成。而在网络编程中,每一个客户端连接发出后,服务端都会有一个对应线程来处理请求,服务器线程与并发数成 1:1 关系,然而一个服务器所能处理的线程是有限的,处理高并发时就会有问题。

NIO 是一种非阻塞同步 IO,它是一种 Reactor 模式的编程模型,简单来讲,就是当服务端有多个连接接入时,并不为每个连接单独创建线程,而是创建一个 Reactor 线程,用多路复用器来不断的轮询每一个接入的连接,找出有数据需要读写的连接进行后续操作,从而具有处理多个连接的能力。

java 原生的 NIO 实现有很多类和组件,但其核心组件有三个,其他的都是一些相关的工具类:

  • Channel    与 BIO 中的流不同,NIO 用 Chananl 来抽象数据通道,数据通过 Channel 来读取和写入,从 Channle 的类图来看,通道分为两大类:用于网络读写的 SelectableChannel 和用于文件读写的 FileChannel
  • Buffer     在 NIO 中,数据与 Channel 之间的交互是通过 buffer 来进行的,数据读写先经过 buffer 再进入通道
  • Selector   多路复用器 Selector 是 NIO 的基础。Selector 会不断轮询注册在其上面的 Channel,如果某个 Channel 上发生读或写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 集合,让后异步的将 Channel 的数据读入缓冲区

下面是一个简单 NIO 服务器,用来演示 NIO 的编程模型

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
 
import javax.swing.text.html.HTMLDocument.Iterator;
 
/**
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
* @author zale (zalezone.cn)
*/
public class SelectSockets {
    public static int PORT_NUMBER = 1234;
    public static void main(String[] argv) throws Exception 
    {
        new SelectSockets().go(argv);
    }
    public void go(String[] argv) throws Exception 
    {
        int port = PORT_NUMBER;
        if (argv.length > 0) 
        { // 覆盖默认的监听端口
            port = Integer.parseInt(argv[0]);
        }
        System.out.println("Listening on port " + port);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();// 打开一个未绑定的serversocketchannel
        ServerSocket serverSocket = serverChannel.socket();// 得到一个ServerSocket去和它绑定 
        Selector selector = Selector.open();// 创建一个Selector供下面使用
        serverSocket.bind(new InetSocketAddress(port));//设置server channel将会监听的端口
        serverChannel.configureBlocking(false);//设置非阻塞模式
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将ServerSocketChannel注册到Selector
        while (true) 
        {
            // This may block for a long time. Upon returning, the
            // selected set contains keys of the ready channels.
            int n = selector.select();
            if (n == 0) 
            {
                continue; // nothing to do
            }           
            java.util.Iterator<SelectionKey> it = selector.selectedKeys().iterator();// Get an iterator over the set of selected keys
            //在被选择的set中遍历全部的key
            while (it.hasNext()) 
            {
                SelectionKey key = (SelectionKey) it.next();
                // 判断是否是一个连接到来
                if (key.isAcceptable()) 
                {
                    ServerSocketChannel server =(ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    registerChannel(selector, channel,SelectionKey.OP_READ);//注册读事件
                    sayHello(channel);//对连接进行处理
                }
                //判断这个channel上是否有数据要读
                if (key.isReadable()) 
                {
                    readDataFromSocket(key);
                }
                //从selected set中移除这个key,因为它已经被处理过了
                it.remove();
            }
        }
    }
    // ----------------------------------------------------------
    /**
    * Register the given channel with the given selector for the given
    * operations of interest
    */
    protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception
    {
        if (channel == null) 
        {
            return; // 可能会发生
        }
        // 设置通道为非阻塞
        channel.configureBlocking(false);
        // 将通道注册到选择器上
        channel.register(selector, ops);
    }
    // ----------------------------------------------------------
    // Use the same byte buffer for all channels. A single thread is
    // servicing all the channels, so no danger of concurrent acccess.
    //对所有的通道使用相同的缓冲区。单线程为所有的通道进行服务,所以并发访问没有风险
    private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    /**
    * Sample data handler method for a channel with data ready to read.
    * 对于一个准备读入数据的通道的简单的数据处理方法
    * @param key
    *
    A SelectionKey object associated with a channel determined by
    the selector to be ready for reading. If the channel returns
    an EOF condition, it is closed here, which automatically
    invalidates the associated key. The selector will then
    de-register the channel on the next select call.
 
    一个选择器决定了和通道关联的SelectionKey object是准备读状态。如果通道返回EOF,通道将被关闭。
    并且会自动使相关的key失效,选择器然后会在下一次的select call时取消掉通道的注册
    */
    protected void readDataFromSocket(SelectionKey key) throws Exception 
    {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count;
        buffer.clear(); // 清空Buffer
        // Loop while data is available; channel is nonblocking
        //当可以读到数据时一直循环,通道为非阻塞
        while ((count = socketChannel.read(buffer)) > 0) 
        {
            buffer.flip(); // 将缓冲区置为可读
            // Send the data; don't assume it goes all at once
            //发送数据,不要期望能一次将数据发送完
            while (buffer.hasRemaining()) 
            {
                socketChannel.write(buffer);
            }
            // WARNING: the above loop is evil. Because
            // it's writing back to the same nonblocking
            // channel it read the data from, this code can
            // potentially spin in a busy loop. In real life
            // you'd do something more useful than this.
            //这里的循环是无意义的,具体按实际情况而定
            buffer.clear(); // Empty buffer
        }
        if (count < 0) 
        {
            // Close channel on EOF, invalidates the key
            //读取结束后关闭通道,使key失效
            socketChannel.close();
        }
    }
    // ----------------------------------------------------------
    /**
    * Spew a greeting to the incoming client connection.
    *
    * @param channel
    *
    The newly connected SocketChannel to say hello to.
    */
    private void sayHello(SocketChannel channel) throws Exception 
    {
        buffer.clear();
        buffer.put("Hi there!\r\n".getBytes());
        buffer.flip();
        channel.write(buffer);
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏SAP最佳业务实践

SAP最佳业务实践:MM–交货与库存调拨(134)-5发货

4.5 VL10B采购订单的交货到期清单 该活动创建库存调拨订单的交货。 角色:仓库文员 后勤 -后勤执行-外向处理- 外向交货的发货 -外向交货 -创建-交货...

38750
来自专栏SAP最佳业务实践

SAP最佳业务实践:MM–转包(138)-4发货过账

3.5 VL02N拣配确认(可选) 该活动拣配、包装并发运相关部件到供应商,此步骤只在拣配不需要 WM 调拨订单的情况下执行。 角色:仓库文员 后勤 -后勤...

34640
来自专栏社区的朋友们

TAF 必修课(二):Reactor多线程模型

最近看了很多文章和分享,非常受益, 实习所做项目主要用到了TAF,有必要对之前的学习做个梳理和总结,网络线程模型及请求接收过程,必修亦为基础、通用,故取其名。

1.1K10
来自专栏Linux驱动

Linux-使用patch命令给uboot打补丁(3)

patch:修改文件,让用户对原文件打补丁 用法   patch -p[剥离层级]  <[补丁文件] 打补丁示例: u-boot-1.1.6_jz2440.p...

32890
来自专栏YG小书屋

ES节点丢失导致实时数据导入速度特别慢

1.1K20
来自专栏专注于主流技术和业务

SpringMVC源码阅读:ContextLoaderListener初始化过程

ContextLoaderListener监听器的作用就是启动web容器时,自动装配ApplicationContext的配置信息。它实现了ServletCon...

33140
来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–外贸出口处理(118)-4发货

一、VL10C创建交货 1. 在 销售订单项目 屏幕上,进行以下输入: 字段名称用户操作和值注释装运点/接收点<装运点> 交货创建日期(从)<输入交货创建日期>...

449100
来自专栏清晨我上码

第八节 netty前传-NIO 几种channel介绍02

java bio中的serversocket和nio中的socket有些类似,两者使用可参考如下: BIO模式

9720
来自专栏JavaEdge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

29940
来自专栏实战docker

Docker下部署dubbo,消费者应用无法使用link参数的问题

在前一篇文章《Docker下dubbo开发,三部曲之一:极速体验》中,我们快速体验了部署在Docker环境下的dubbo服务,当时一共启动了四个容器,具体情况为...

30690

扫码关注云+社区

领取腾讯云代金券