前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】第三十篇:Netty 之 Java NIO

【Flink】第三十篇:Netty 之 Java NIO

作者头像
章鱼carl
发布2022-03-31 11:24:13
8490
发布2022-03-31 11:24:13
举报
文章被收录于专栏:章鱼carl的专栏

相关推荐:

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十八篇:Direct Memory 一箩筐

从本篇开始回顾总结Netty通信框架,尝试为读者揭开它的神秘面纱。Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而operator之间的数据传输是利用Netty。

1. Akka:基于协程,性能好;基于scala的偏函数,易用性高。但它只是RPC通信。

2. Netty:相比更加基础一点,可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持。

关于Netty笔者计划分两篇讲述。Netty是基于Java NIO的,所以本篇主要介绍,

  1. IO 中的基本概念
  2. 5种 IO 模型
  3. IO多路复用

下一篇将介绍Netty的主要设计思想、核心抽象、IO线程模型等。

Java NIO核心两部分:零拷贝、IO模型(IO多路复用),其中零拷贝已经在【Flink】第十八篇:Direct Memory 一箩筐介绍过。

阻塞/非阻塞、同步/异步

阻塞/非阻塞

举例,你打电话问书店老板有没有《操作系统》这本书:

1. 阻塞 - 如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果。

2. 非阻塞 - 如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了,当然你也要偶尔过几分钟check一下老板有没有返回结果或者老板找到后主动call你手机。

阻塞和非阻塞关注的是主调的状态。主调在等待调用结果(返回值)时的状态(是否还做其他事)。

即如果这个调用方法立即返回,那么主调就可以去做其他事,则是非阻塞,有执行结果才返回,那么主调就不可以去做其他事,则是阻塞。

同步/异步

举例,你打电话问书店老板有没有《操作系统》这本书:

1. 同步 - 书店老板会说,你稍等,我查一下,然后开始查啊查。等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。

2. 异步 - 书店老板直接告诉你我查一下啊,然后直接挂电话了(立即返回一个不一定是调用结果的返回值)。等查好了,他会主动打电话给你,在这里老板通过回电这种方式来回调。

同步和异步关注的是通信机制。主调主动询问被调调用结果还是被调主动通知主调调用结果。

1. 同步 - 主调主动询问被调的调用结果:每次都是主调去主动询问被调用结果,不管调用方法是不是立即返回。

2. 异步 - 被调主动通知主调的调用结果:主调在发出调用之后,调用立即返回了,即使没有返回调用结果。随后,被调执行后得到结果,导致被调状态改变,触发特定事件,调用主调的回调方法,完成被调主动通知主调。

基本概念

用户空间 / 内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。

操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。

针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行,这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:

1) 保存处理机上下文,包括程序计数器和其他寄存器。(即上下文切换)

2) 更新PCB信息。

3) 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。

4) 选择另一个进程执行,并更新其PCB。

5) 更新内存管理的数据结构。

6) 恢复处理机上下文。

在这个过程中存在冯诺依曼瓶颈:寄存器和内存之间的拷贝耗时太多,浪费CPU时间。

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

文件描述符fd

文件描述符(File Descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在linux系统中,系统内核有个缓冲区叫做内核缓冲区。每个进程也有自己独立的缓冲区,叫做进程缓冲区。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的优点:

缓冲区的目的,是为了减少频繁的系统IO调用。有了缓冲区,操作系统使用read函数把数据从内核缓冲区复制到进程缓冲区,write把数据从进程缓冲区复制到内核缓冲区中。等待缓冲区达到一定数量的时候,再进行IO的调用,提升性能。至于什么时候读取和存储则由内核来决定,用户程序不需要关心。

缓存 I/O 的缺点:

数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

所以,用户程序的IO读写程序,大多数情况下,并没有进行实际的IO操作,而是在读写自己的进程缓冲区。

IO读写原理

用户程序进行IO的读写,基本上会用到read&write两大系统调用。

  • read系统调用,是把数据从内核缓冲区复制到进程缓冲区;
  • write系统调用,是把数据从进程缓冲区复制到内核缓冲区。

IO模型

1. 这里的阻塞与非阻塞

阻塞IO,指的是需要内核IO操作彻底完成后,才返回到用户空间,执行用户的操作。阻塞指的是用户空间程序的执行状态,用户空间程序需等到IO操作彻底完成。传统的IO模型都是同步阻塞IO。在java中,默认创建的socket都是阻塞的。非阻塞指不等内核IO操作完就返回,所以用户需要多次调用去判断什么时候内核IO操作完成。

2. 这里的同步与异步

同步IO是指用户空间线程是主动发起IO请求的一方,内核空间是被动接受方。异步IO则反过来,是指内核kernel是主动发起IO请求的一方,用户线程是被动接受方。

5种 IO 模型:

  1. Blocking IO - 阻塞IO
  2. Non-Blocking IO - 非阻塞IO
  3. IO multiplexing - IO多路复用
  4. Signal driven IO - 信号驱动IO
  5. Asynchronous IO - 异步IO

1. Blocking IO - 阻塞IO

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概如下图:

例如,发起一个blocking socket的read读操作系统调用,流程大概是这样:

(1) 当用户线程调用了read系统调用,内核(kernel)就开始了IO的第一个阶段:准备数据。很多时候,数据在一开始还没有到达(比如,还没有收到一个完整的Socket数据包),这个时候kernel就要等待足够的数据到来。

(2) 当kernel一直等到数据准备好了,它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存),然后kernel返回结果。

(3) 从开始IO读的read系统调用开始,用户线程就进入阻塞状态。一直到kernel返回结果后,用户线程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在内核进行IO执行的两个阶段,用户线程都被block了。

BIO的优点:

程序简单,在阻塞等待数据期间,用户线程挂起。用户线程基本不会占用 CPU 资源。

BIO的缺点:

一般情况下,会为每个连接配套一条独立的线程,或者说一条线程维护一个连接成功的IO流的读写。在并发量小的情况下,这个没有什么问题。但是,当在高并发的场景下,需要大量的线程来维护大量的网络连接,内存、线程切换开销会非常巨大。因此,基本上,BIO模型在高并发场景下是不可用的。

2. Non-Blocking IO - 非阻塞IO

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

例如,发起一个non-blocking socket的read读操作系统调用,流程是这个样子:

(1) 在内核数据没有准备好的阶段,用户线程发起IO请求时,立即返回。用户线程需要不断地发起IO系统调用。

(2) 内核数据到达后,用户线程发起系统调用,用户线程阻塞。内核开始复制数据。它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存),然后kernel返回结果。

(3) 用户线程才解除block的状态,重新运行起来。经过多次的尝试,用户线程终于真正读取到数据,继续执行。

NIO的特点:

应用程序的线程需要不断的进行 I/O 系统调用,轮询数据是否已经准备好,如果没有准备好,继续轮询,直到完成系统调用为止。

NIO的优点:

每次发起的IO系统调用,在内核的等待数据过程中可以立即返回。用户线程不会阻塞,实时性较好。

NIO的缺点:

需要不断的重复发起IO系统调用,这种不断的轮询,将会不断地询问内核,这将占用大量的 CPU 时间,系统资源利用率较低。

3. IO multiplexing - IO多路复用

I/O多路复用是网络编程中最常用的模型,像最常用的select、epoll都属于这种模型。以select为例:

(1) 进行select/epoll系统调用,查询可以读的连接。kernel会查询所有select的可查询socket列表,当任何一个socket中的数据准备好了,select就会返回。当用户进程调用了select,那么整个线程会被block(阻塞掉)。

(2) 用户线程获得了目标连接后,发起read系统调用,用户线程阻塞。内核开始复制数据。它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存),然后kernel返回结果。

(3) 用户线程才解除block的状态,用户线程终于真正读取到数据,继续执行。

多路复用IO的特点:

IO多路复用模型,建立在操作系统kernel内核能够提供的多路分离系统调用select/epoll基础之上的。多路复用IO需要用到两个系统调用(system call),一个select/epoll查询调用,一个是IO的读取调用。

多路复用IO的优点:

用select/epoll的优势在于,它可以同时处理成千上万个连接(connection)。与一条线程维护一个连接相比,I/O多路复用技术的最大优势是:系统不必创建线程,也不必维护这些线程,减少了用户态和空间态的切换,以及系统内核轮训所有文件描述符的次数,因为有IO事件才会callback事件到就绪队列中。

多路复用IO的缺点:

本质上,select/epoll系统调用,属于同步IO,也是阻塞IO。都需要在读写事件就绪后,自己负责进行读写,也就是说这个读写过程是阻塞的。

如何充分的解除线程的阻塞呢?那就是异步IO模型。

4. Asynchronous IO - 异步IO

(1) 当用户线程调用了read系统调用,立刻就可以开始去做其它的事,用户线程不阻塞。

(2) 内核(kernel)就开始了IO的第一个阶段:准备数据。当kernel一直等到数据准备好了,它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存)。

(3) kernel会给用户线程发送一个信号(signal),或者回调用户线程注册的回调接口,告诉用户线程read操作完成了。

(4) 用户线程读取用户缓冲区的数据,完成后续的业务操作。

异步IO模型的特点:

在内核kernel的等待数据和复制数据的两个阶段,用户线程都不是block(阻塞)的。用户线程需要接受kernel的IO操作完成的事件,或者说注册IO操作完成的回调函数,到操作系统的内核。所以说,异步IO有的时候,也叫做信号驱动 IO 。

异步IO模型缺点:

需要完成事件的注册与传递,这里边需要底层操作系统提供大量的支持,去做大量的工作。目前来说, Windows 系统下通过 IOCP 实现了真正的异步 I/O。而在 Linux 系统下,异步IO模型在2.6版本才引入,目前并不完善。所以,这也是在 Linux 下,实现高并发网络编程时都是以 IO 复用模型模式为主。

IO多路复用

1. select

时间复杂度O(n)

它仅仅知道有I/O事件发生了,却并不知道是哪几个流(可能有一个,多个,甚至全部),它只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。

2. poll

时间复杂度O(n)

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,但是它没有最大连接数的限制,原因是它是基于链表来存储,而不是数组

3. epoll

时间复杂度O(1)

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll会把哪个流发生了怎样的I/O事件通知我们。所以我们说epoll实际上是事件驱动(每个事件关联fd),此时我们对这些流的操作都是有意义的。

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,用一个线程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

Reactor反应模式

Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis、Tomcat、Java NIO在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

1. 单/多线程IO

最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

代码语言:javascript
复制
while(true){
    socket = accept();
    handle(socket)
}

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

代码语言:javascript
复制
class BasicModel implements Runnable {
    public void run() {
        try {
            ServerSocket ss =
                    new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[SystemConfig.INPUT_SIZE];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] input) {
            byte[] output=null;
            /* ... */
            return output;
        }
    }
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

优点:

一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

缺点:

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进方法:

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

2. 单线程Reactor

Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。

代码语言:javascript
复制
static class Server
{
        public static void testServer() throws IOException
        {
            // 1、获取Selector选择器
            Selector selector = Selector.open();
            // 2、获取通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 3、设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 4、绑定监听端口
            serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));
            // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
            while (selector.select() > 0)
            {
                // 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                while (selectedKeys.hasNext())
                {
                    // 8、获取“准备就绪”的时间
                    SelectionKey selectedKey = selectedKeys.next();
                    // 9、判断key是具体的什么事件
                    if (selectedKey.isAcceptable())
                    {
                        // 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 11、切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        // 12、将该通道注册到selector选择器上
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    else if (selectedKey.isReadable())
                    {
                        // 13、获取该选择器上的“读就绪”状态的通道
                        SocketChannel socketChannel =
(SocketChannel) selectedKey.channel();
                        // 14、读取数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int length = 0;
                        while ((length = socketChannel.read(byteBuffer)) != -1)
                        {
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array(), 0, length));
                            byteBuffer.clear();
                        }
                        socketChannel.close();
                    }
                    // 15、移除选择键
                    selectedKeys.remove();
                }
            }
            // 7、关闭连接
            serverSocketChannel.close();
        }
        public static void main(String[] args) throws IOException
        {
            testServer();
        }
}

抽象出来两个组件——Reactor和Handler两个组件:

(1) Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

(2) Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

缺点:

(1) 当其中某个 handler 阻塞时,会导致其他所有的client 的 handler 都得不到执行,并且更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

(2) 因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

3. 多线程Reactor

在单线程Reactor模式基础上,做如下改进:

(1) 将Handler处理器的执行放入线程池,多线程进行业务处理。

(2) 对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档