Java中的NIO基础知识

上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫

Java NIO 由3个核心组成,分别是Channels,Buffers,Selectors。本文主要介绍着三个部分。

Channel

所有的I/O都从一个Channel开始。通道与流不同,通道是双向的,流是单向的。

即可以从通道中读取数据,也可以写数据到通道里 。

读的话,是从通道读取数据到缓冲区,写的话是从缓冲区写入数据到通道。

四种通道:

  • FileChannel.从文件中读写数据
  • DatagramChannel.通过UDP协议,读写网络中的数据
  • SocketChannel,能通过TCP协议来读写网络中数据,常用于客户端
  • ServerSocketChannel。监听TCP连接,对每个新进来的连接会创建一个SocketChannel。

Buffer

Java NIO中的Buffer用于NIO通道进行交互。

缓冲区本质上一块可以写入数据,也可以从中读取数据的内存。也就是堆外内存,也叫直接内存。

当向Buffer写入数据时,Buffer会记录下写了多少数据,一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到度模式。

在读模式下,可以读取之前写入到Buffer的所有数据。

一旦读完了所有数据,就需要情况缓存区,让它可以再次被写入。有两种方式能清空缓冲区,调用clear()或者compact()方法。

clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

任何未读的数据将被移到缓冲区的起始处,新写入的数据将放大缓冲区未读数据的后面。

Buffer的capacity,position和limit

capacity

capacity作为一个内存块,buffer有一个固定的大小值,也叫capacity,只能向内存中写入byte,long,char等类型。一旦Buffer满了,需要将其清空。

position

当写数据到Buffer中是,position表示当前的位置。初始的position值为0,当一个byte,long等数据写到buffer后,position会向前移动到下一个可插入数据的单元。positon最大可谓capacity-1.

当读取数据时,也是从特定位置读。将Buffer从写模式切换到读模式,positon会被重置0,当从Buffer的position处读取数据时,position向前移动到想一个可以读的位置。

limit

在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。写模式下,limit等于buffer的capacity

Buffer的分配

要想获得一个Buffer对象首先要进行分配。 每一个Buffer类都有一个allocate方法。下面是一个分配48字节capacity的ByteBuffer的例子。

ByteBuffer buf = ByteBuffer.allocate(48);

Selector

 Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够检测到通道是否为读写事件准备好的的组件。所以Selector可以单个线程处理多个Channel。

为什么使用Selector

Selector能够使用一个线程来处理所有通道。但是对于如今的操作系统和CPU来说,多线程已经较过去效率高了很多。

Selector的创建

1.通过调用Selector.open()方法创建一个Selector

2.将Channel注册到Selector上配合使用,可使用Channel.register方法来实现,如下

            servChannel.configureBlocking(false);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);

 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着FileChannel与Selector不能一起使用,因为FileChannel不能切换到非阻塞模式。

3.通道触发意味着该事件已经就绪。Java中有如下常量对应着通道事件。

  • SelectionKey.OP_CONNECT(连接就绪):Channel成功连接到另一个服务器
  • SelectionKey.OP_ACCEPT(接收就绪):Channel准备好接收进入新的连接
  • SelectionKey.OP_READ(读就绪):Channel有数据可以读
  • SelectionKey.OP_WRITE(写就绪):Chanel有数据可以写

4.SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含interest集合,ready集合,Channel,Selector,附加的对象(可选)。

interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合。

ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。

用NIO创建的客户端与服务端:

服务端:

package com.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MutipleexerTimeServer implements Runnable{

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop = false;

    /**
     * 创建多路复用器,绑定NIO端口
     *
     * @param port
     */
    public MutipleexerTimeServer(int port){
        try{
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            servChannel.socket().bind(new InetSocketAddress(port),1024);
            System.out.println("the time server start at port: "+port );
        }catch (Exception e){
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop(){
        this.stop = stop;
    }

    @Override
    public void run() {
        while (!stop){
            try {
                // selector每隔一秒唤醒一次
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handlerInput(key);
                    }catch (Exception e){
                        if (key !=null){
                            key.cancel();
                            if (key.channel() !=null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册到上面的channel和pipe等资源都不被自动去注册并关闭,所有不需要重复释放资源
        if (selector!=null){
            try {
                selector.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private void handlerInput(SelectionKey key) throws Exception{
        if (key.isValid()){
            // 处理新接入的请求消息
            if (key.isAcceptable()){
                // Accept the new Connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                // 已完成TCP三次握手
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector,SelectionKey.OP_READ);
            }

            if (key.isReadable()){
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes>0){
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("server receive order: "+body);
                    String correntTime = "QUERY".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
                    doWrite(sc,correntTime);
                }else if (readBytes<0){
                     // 对端链路关闭
                    key.cancel();
                    sc.close();
                }else {
                    ;
                }
            }
        }
    }

    private void doWrite(SocketChannel channel,String response) throws Exception{
        if (response!=null && response.trim().length()>0){
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}


package com.nio;
/**
 * 启动类
 */
public class TimeServer {

    public static void main(String args[]){
        int port = 9816;
        MutipleexerTimeServer timeServer = new MutipleexerTimeServer(port);
        new Thread(timeServer,"nit").start();
    }
}

客户端:

package com.nio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author tangj
 * @date 2018/6/14 23:13
 */
public class TimeClientHandle implements Runnable{

    private String host;

    private int port;

    private Selector selector;

    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host,int port){
        this.host = host;
        this.port = port;
        try{
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        }catch (Exception e){
            e.printStackTrace();
            System.exit(-1);
        }
    }

    @Override
    public void run() {
        try{
            doConnect();
        }catch (Exception e){
            e.printStackTrace();
            System.exit(1);
        }

        while (!stop){
            try{
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handlerInput(key);
                    }catch (Exception e){
                        if (key!=null){
                            key.cancel();
                            if (key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册到上面的channel和pipe等资源都不被自动去注册并关闭,所有不需要重复释放资源
        if (selector!=null){
            try {
                selector.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private void handlerInput(SelectionKey key) throws Exception{
        if (key.isValid()){
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()){
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else {
                    // 连接失败
                    System.exit(1);
                }

            }
            if (key.isReadable()){
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes >0){
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("NOW IS: "+body);
                    this.stop = true;
                }else if (readBytes < 0){
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                }else {
                    ; //读到0字节,忽略
                }
            }
        }
    }

    private void doConnect() throws Exception{
        // 如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
        if (socketChannel.connect(new InetSocketAddress(host,port))){
            socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
        }else {
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel sc) throws IOException{
        byte[] req = "QUERY".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()){
            System.out.println("send order 2 server secceed");
        }
    }
}



package com.nio.client;

/**
 * @author tangj
 * @date 2018/6/14 22:56
 */
public class TimeClient {
    public static void main(String args[]){
        new Thread(new TimeClientHandle("127.0.0.1",9816)).start();
    }
}

参考:

并发编程网

《Netty权威指南》

代码地址:

github

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏芋道源码1024

【Netty 专栏】Netty 源码分析之 accept 过程

在Netty源码分析之NioEventLoop章节中,已经分析了NioEventLoop的工作机制,当有客户端connect请求,selector可以返回其对应...

1070
来自专栏javathings

Java NIO 实现网络通信

Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍...

2912
来自专栏蓝天

基于zookeeper的主备切换方法

继承CZookeeperHelper即可快速实现主备切换: https://github.com/eyjian/mooon/blob/master/mooo...

1292
来自专栏java 成神之路

Spring mvc ContextLoaderListener 原理解析

2055
来自专栏java 成神之路

NIO 之 Selector实现原理

3939
来自专栏个人分享

NIO源码阅读

  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

1044
来自专栏JavaEdge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

2824
来自专栏chenssy

【死磕Netty】-----NIO基础详解

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! Netty 是基于Java NIO 封装的网络通讯框架,只有充...

4546
来自专栏Ryan Miao

spring boot 添加拦截器

构建一个spring boot项目。 添加拦截器需要添加一个configuration @Configuration @ComponentScan(basePa...

4098
来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–含客户预付款的销售订单处理(201)-5发货

VL10C交货处理 在此活动中,创建交货。 后勤 ®后勤执行 ® 外向处理 ® 外向交货的发货 ® 外向交货 ® 创建 ® 交货凭证到期日的集中处理 ®销售订单...

3636

扫码关注云+社区

领取腾讯云代金券