前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO初探

NIO初探

作者头像
黑洞代码
发布2021-01-14 15:37:32
3150
发布2021-01-14 15:37:32
举报

1.缓冲区

缓冲区Buffer是一个对象,它包含要写入和读出的数据。在NIO中加入Buffer对象是与BIO的一个重要区别。NIO中所有数据都是通过缓冲区处理的。缓冲区实质上是一个数组,缓冲区提供对数据的结构化访问和维护读写位置等信息。

ByteBuffer:字节缓冲区

CharBuffer:字符缓冲区

ShortBuffer:短整型缓冲区

IntBuffer:整型缓冲区

LongBuffer:长整型缓冲区

FloatBuffer:浮点型缓冲区

DoubleBuffer:双精度浮点型缓冲区

Boolean型没有对应的缓冲区

这些缓冲区都继承自Buffer

2.Channel

Channel是一个通道,网络数据通过Channel进行读写。

通道与流的区别:

流是一个方向上的移动(输入流或者输出流)

通道是双向的,通道可读可写并可以同时进行

Channel可以分为两大类:

SelectableChannel:用于网络读写

FileChannel:用于文件操作

3.多路复用器

多路复用器提供选择已经就绪的任务的能力。Selector会轮询注册在其上的Channel,如果某个Channel发生了读写时间,这个Channel就处于就绪态,就会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel集合,进行后续的IO操作。

一个Selector可以同时轮询多个Channel,JDK只用epoll代替传统的select实现,所以只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

4.NIO服务端时序图

Netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端Channel,由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁IO阻塞导致的线程挂起。另外,由于Netty采用了异步通信模式,一个IO线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞IO一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

下面用netty实现一个客户端和服务端之间的通信

代码语言:javascript
复制
public class NioServer {
    public static void main(String[] args) {
        //多路复用类
        NioServerTask nioServerTask = new NioServerTask(8888);
        //单独的线程维护多路复用器
        new Thread(nioServerTask).start();
    }
}

这个main函数中主要的逻辑就是new一个线程,在线程中提交一个任务。看下NioServerTask代码如下:

代码语言:javascript
复制
public class NioServerTask implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean stop;

    /**
     * 初始化多路复用器,绑定端口
     * @param port
     */
    public NioServerTask(int port) {

        try{
            //多路复用器
            selector = Selector.open();
            //打开ServerSocketChannel, 监听客户端连接
            serverSocketChannel = ServerSocketChannel.open();
            //非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //监听端口
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            // ServerSocketChannel注册到多路复用器Selector上,监听ACCEPT事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器启动端口:" + port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void stop () {
        this.stop = true;
    }

    @Override
    public void run() {
        //轮询就绪的key
        while (!stop) {
            try {
               selector.select(1000);
               Set<SelectionKey> selectionKeys = selector.selectedKeys();
               Iterator<SelectionKey> iterator = selectionKeys.iterator();
               SelectionKey key = null;
               while (iterator.hasNext()) {
                   key = iterator.next();
                   iterator.remove();
                   try {
                       //处理key
                       handleKey(key);
                   } catch (Exception e) {
                       if (key != null) {
                           key.cancel();
                           if (key.channel() != null) {
                               key.channel().close();
                           }
                       }
                   }
               }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理key
     * @param key
     * @throws IOException
     */
    private void handleKey(SelectionKey key) throws IOException {
        if (key.isValid()) {
            //监听到有新的客户端接入
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                //完成TCP三次握手,TCP物理链接建立
                SocketChannel socketChannel = serverSocketChannel.accept();
                //客户端设置为非阻塞
                socketChannel.configureBlocking(false);
                //新的客户端注册到多路复用器Selector上,监听读操作,读取客户端发送的消息
                socketChannel.register(selector, SelectionKey.OP_READ);
            }

            //  读取数据
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                //1M的缓冲区
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                //读取客户端请求到缓冲区
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                    //当前缓冲区limit设置为position,position设置为0,便于后续对缓冲区的读取
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes);
                    System.out.println("获取客户端输入:" + body);
                    doWrite(socketChannel, "当前时间是:" + new Date());
                } else if (readBytes < 0) {
                    //关闭
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

    /**
     * 发送响应消息
     * @param socketChannel
     * @param response
     * @throws IOException
     */
    private void doWrite(SocketChannel socketChannel, String response) throws IOException {
        if (!StringUtils.isEmpty(response)) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
        }
    }
}

对应的Nio客户端代码如下:

代码语言:javascript
复制
public class NioClient {
    public static void main(String[] args) {
        NioClientTask nioClientTask = new NioClientTask("127.0.0.1", 8888);
        new Thread(nioClientTask).start();
    }
}

与服务端一样,main函数这里也没有什么逻辑,还是看看NioClientTask的代码如下:

代码语言:javascript
复制
public class NioClientTask implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public NioClientTask(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            selector = Selector.open();
            //打开SocketChannel
            socketChannel = SocketChannel.open();
            //设置SocketChannel为非阻塞
            socketChannel.configureBlocking(false);
        } catch (Exception e) {

        }
    }

    @Override
    public void run() {
        try {
            connect();
        } catch (Exception e) {
            e.printStackTrace();
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                SelectionKey selectionKey = null;
                while (iterator.hasNext()) {
                    selectionKey = iterator.next();
                    iterator.remove();
                    try {
                        handle(selectionKey);
                    } catch (Exception e) {
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handle(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isValid()) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //是否连接成功
            if (selectionKey.isConnectable()) {
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    doWrite(socketChannel);
                } else {
                    System.exit(1);
                }
            }
        }

        if (selectionKey.isReadable()) {
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int readBytes = socketChannel.read(readBuffer);
            if (readBytes > 0) {
                readBuffer.flip();
                byte[] bytes = new byte[readBuffer.remaining()];
                readBuffer.get(bytes);
                String body = new String(bytes);
                System.out.println(body);
            } else if (readBytes < 0) {
                selectionKey.channel();
                socketChannel.close();
            } else {
                //忽略
            }
        }
    }

    /**
     * 连接服务端
     * @throws IOException
     */
    private void connect() throws IOException {
        //如果连接成功,注册到多路复用器上,发送请求
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            //不代表连接失败,等待服务端返回syn-ack
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    /**
     * 写入消息
     * @param socketChannel
     * @throws IOException
     */
    private void doWrite(SocketChannel socketChannel) throws IOException {
        byte[] req = "hello world".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        socketChannel.write(writeBuffer);
    }
}

这样就完成了一个简单的NIO通讯的客户端和服务端。

代码语言:javascript
复制
服务端输入如下:
服务器启动端口:8888
获取客户端输入:hello world
客户端输出如下:
当前时间是:Wed Sep 05 17:11:07 CST 2018
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
弹性伸缩
弹性伸缩(Auto Scaling,AS)为您提供高效管理计算资源的策略。您可设定时间周期性地执行管理策略或创建实时监控策略,来管理 CVM 实例数量,并完成对实例的环境部署,保证业务平稳顺利运行。在需求高峰时,弹性伸缩自动增加 CVM 实例数量,以保证性能不受影响;当需求较低时,则会减少 CVM 实例数量以降低成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档