前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解I/O多路转接模型:select & poll & epoll

详解I/O多路转接模型:select & poll & epoll

作者头像
二肥是只大懒蓝猫
发布2023-10-13 11:42:04
4680
发布2023-10-13 11:42:04
举报
文章被收录于专栏:热爱C嘎嘎热爱C嘎嘎

文章技术分享思路:从select模型开始,先了解select模型的理论基础,然后编写简单的基于select的tcp服务器,接着分析出select的特点和缺点。引出poll模型,了解了poll模型的基础理论,编写简单的基于poll的tcp服务器。接着引出重要的epoll模型,了解epoll模型的理论以及原理,编写简单的基于epoll的tcp服务器,总结select、poll和epoll的区别,ET模式下epoll高效的原因。

多路转接

多路转接是IO模型的一种,这种IO模型通过select、poll或者epoll进行IO等待,可以同时等待多个文件描述符,当某个文件描述符的事件就绪,便会通知上层处理对应的事件。

I/O多路转接之select

了解select的基础理论

分享的流程是:先介绍什么是select、然后介绍select系统调用,接着了解select执行过程。

什么是select?

系统提供select函数来实现多路复用输入/输出模型。select系统调用是用来让我们的程序监视多个文件描述符的状态变化的。程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。

通俗的来讲,select函数,就是负责等待,得到文件描述符就绪后,通知上层进行读取或写入。select没有读取或写入数据的功能,并且select能够同时等待多个文件描述符。

select函数原型
代码语言:javascript
复制
#include <sys/select.h>

int select(int nfds,  fd_set  *readfds,  fd_set  *writefds, fd_set  *exceptfds,  struct timeval *timeout);

参数解释:

①参数nfds:需要监视的最大的文件描述符值+1。+1是为了确定遍历的范围(在内核层面上),也就是说在内核中需要遍历数组中,哪些文件描述符是合法的,需要等待事件就绪的,因此需要一个遍历的范围。

要解释readfds、writefds和exceptfds前,先解释它们的类型fd_set类型。

fd_set类型

fd_set是一个整数数组, 更严格的说, 是一个 "位图"。使用位图中对应的位来表示要监视的文件描述符。

在fd_set位图结构中,使用比特位的“位置”来表示某一个sock

而对于比特位的“内容”,首先我们需要知道的是,readfds、writefds和exceptfds三个参数都是输入输出型参数。

以readfds读为例:

用户在使用该参数进行输入时,实质上是用户告诉内核,内核你要帮我关心一下哪些文件描述符上的读事件就绪。 内核进行输出时,实质上是告诉用户,用户你所关心的那些文件描述符上的读事件已经就绪。

于是,对于比特位的“内容”,首先是输入时,是用户想要内核帮忙关心的文件描述符的合集。在输出时,是内核要告诉用户已经就绪的文件描述符的合集

比如,输入时,我们规定用户想要关心的文件描述,在位图结构中,其比特位的位置位1,3,5,于是在输入时,将其内容置为1,表示我们需要让select帮我们关心1,3,5文件描述符。那么在输出时,假设这些文件描述符1,5都已经就绪,输出回来时,这个合集中的1,5比特位的位置上的内容为1,而3由于没有就绪,就为0。需要注意的是,输入输出的都是同一个位图,是同一个!

提供了一组操作fd_set的接口, 来比较方便的操作位图:

void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位。 int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真。 void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位。 void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位。

②readfds、writefds和exceptfds三个参数:分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合。

③参数timeou:参数timeout为结构timeval,用来设置select()的等待时间。一般timeou参数的取值有三种:

NULL/nullptr:填入nullptr或者NULL时,表示阻塞。即表示select()没有timeout, select将一直被阻塞,直到某个文件描述符上发生了事件,即只要不就绪,就不返回。 0:当struct timeval timeout={0,0},即为0时,表示非阻塞。仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生,即只要不就绪,立马返回。 特定的时间值:当struct timeval timeout={5,0}。表示,在5秒内阻塞,5秒后非阻塞。如果在指定的时间段里没有事件发生, select将超时返回。

timeval结构

timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0。

④select函数返回值

当返回值ret>0:表示已有几个fd已经就绪。比如ret = 2,就有2个fd就绪。 当返回值ret==0,表示超时返回 当返回值ret<0,select调用失败

错误值可能为:

EBADF 文件描述词为无效的或该文件已关闭。 EINTR 此调用被信号所中断。 EINVAL 参数n 为负值。 ENOMEM 核心内存不足。

理解select执行过程

理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节, fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。

*(1)执行fd_set set; FD_ZERO(&set);则set用位表示是0000,0000。 *(2)若fd= 5,执行FD_SET(fd,&set).后set变为0001,0000(第5位置为1)。 *(3)若再加入fd= 2, fd=1,则set变为0001,0011。 *(4)执行select(6,&set,0,0,0)阻塞等待。 *(5)若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。 *   注意:没有事件发生的fd=5被清空。

需要注意的是,因为select使用输入输出型参数标识不同的含义,因此每一此都会被清空,这意味着,每一次都需要对fd_set进行重新设置!并且,因为需要重新设置,我们需要通过第三方数组来对这些文件描述符进行保存,这是select的特点之一,也可以称为缺点,因为需要额外开辟空间且需要程序员自己去维护,这个缺点会在epoll中消失!

代码简单实现基于select的tcp服务器

使用select实现一个简单tcp服务器,客户端可以向服务端发送消息,服务端读取数据。

代码思路:

selectServer.hpp:

先进行服务器通信前的操作:创建监听套接字-绑定服务器的ip和端口号、让服务器进去监听状态,接着开辟第三方数组的空间,并让它初始化,默认为-1;

启动服务器:首先将需要等待的文件描述符,从第三方数组中拷贝到位图中,接着使用select进行等待。如果 监测到有事件就绪,那么进行处理。

事件处理:在事件处理中,需要遍历一次第三方数组,找到合法的文件描述符,然后判断这些文件描述符是否在位图中,如果是,那么还要判断,这个文件描述,是监听套接字,还是其它的套接字文件描述符,然后分情况处理;

处理监听套接字的事件:到了这一步,说明监听套接字事件就绪(有客户的请求连接),此时使用accept进行获取并且返回一个用于通信的套接字。注意,此时并不能马上读取数据,因此这个用于通信的套接字不一定事件就绪。因此,需要将其放回第三方数组,等循环第二遍的时候,让select去等待事件就绪。

处理其它的套接字(这里是用于通信的套接字):到了这一步,说明客户端有数据发送过来了。此时,我们可以读取数据了!

实现代码:

selectServer.hpp

代码语言:javascript
复制
#pragma once
#include <iostream>
#include "Sock.hpp"
#include <functional>
#include <string>

namespace select_ns
{
    static const int defalutport = 8080;
    static const int fd_num = sizeof(fd_set)*8;
    static const int defalutval = -1;
    using func_t = std::function<std::string(const std::string&)>;
    class SelectServer
    {
    private:
        int _listensock;
        int _port;
        int* _fdarray;/*第三方数组*/
        func_t _func;
    public:
        SelectServer(func_t f,int port = defalutport)
            :_listensock(-1)
            ,_port(port)
            ,_fdarray(nullptr)
            ,_func(f)
        {

        }
        ~SelectServer()
        {
            if(_listensock !=defalutval) close(_listensock);
            delete[] _fdarray;
        }

        void initServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock,_port);
            Sock::Listen(_listensock);
            /*初始化第三方数组*/
            _fdarray = new int[fd_num];

            for(int i = 0;i<fd_num;++i)/*初始化一次遍历*/
            {
                _fdarray[i] = defalutval;
            }
            _fdarray[0] = _listensock;/*默认第三方数组的第一个位置的内容是监听套接字*/
        }

        void Accepter(int listensock)
        {
            logMessage(DEBUG, "Accepter in");
            std::string clientip;
            uint16_t clientport = 0;
            int sock = Sock::Accept(listensock,&clientip,&clientport);
            if(sock < 0) return;
            logMessage(NORMAL,"accept success [%s:%d]",clientip.c_str(),clientport);
            /*用于通信的套接字不一定事件就绪,因此需要重新放入第三方数组中*/
            //accept返回的用于通信的套接字,需要交给select去监管。因此,放入到数组中即可,等
            //返回去的时候,就会将其添加到位图中,然后select。
            //对数组进行循环遍历,判断是否满了,如果满了,那就不再对这个套接字进行监听
            //没有满,那就添加进去
            int i = 0;
            for(;i<fd_num;++i)
            {
                if(_fdarray[i]!=defalutval) continue;
                else break;
            }
            if(i==fd_num)
            {
                logMessage(WARNING,"server is full,please wait!");
                close(sock);
            }
            else
            {
                _fdarray[i] = sock;
            }
            logMessage(DEBUG, "Accepter out");
        }

        void Recver(int sock,int pos)
        {
            /*到了这一步,用于通信的套接字已经事件就绪了*/
            /*开始读取,其实这里的读取操作是不正确的,因为不知道这一次读取,是否读完整*/
            /*这样操作是为了方便演示基于select的tcp服务器的实现*/
            char buffer[1024];
            ssize_t s = recv(sock,buffer,sizeof(buffer)-1,0);
            if(s>0)
            {
                /*读取到数据*/
                buffer[s] = 0;
                logMessage(NORMAL,"client#  %s",buffer);
            }
            else if(s==0)/*客户端关闭连接*/
            {
                close(sock);
                _fdarray[pos] = defalutval;
                logMessage(NORMAL,"client quit");
                return;
            }
            else/*读取发生错误*/
            {
                close(sock);
                _fdarray[pos] = defalutval;
                logMessage(ERROR,"recv err");
                return;
            }

            /*响应给客户端*/
            std::string response = _func(buffer);
            write(sock,response.c_str(),response.size());
        }
        void HandlerEvent(fd_set& rfds)
        {
            for(int i = 0;i<fd_num;++i)
            {
                if(_fdarray[i]==defalutval) continue;
                if(FD_ISSET(_fdarray[i],&rfds) && _fdarray[i]==_listensock)
                {
                    Accepter(_fdarray[i]);
                }
                else if(FD_ISSET(_fdarray[i],&rfds))
                {
                    Recver(_fdarray[i],i);
                }
                else {}
            }

        }
        void start()
        {   
            for(;;)
            {
                fd_set rfds;/*位图*/
                FD_ZERO(&rfds);/*对位图进行清空*/
                int maxfd = _fdarray[0];
                for(int i = 0;i<fd_num;++i)/*更新/设置fd一次遍历*/
                {
                    if(_fdarray[i]!=defalutval)
                    {
                        FD_SET(_fdarray[i],&rfds);   /*将合法的文件描述符设置到位图中*/
                    }
                    if(maxfd < _fdarray[i]) maxfd = _fdarray[i];/*找出最大值*/
                }
                struct timeval timeout = {5,0};
                int n = select(maxfd+1,&rfds,nullptr,nullptr,&timeout);
                switch(n)
                {
                case 0: 
                    logMessage(NORMAL,"timeout...");
                    break;
                case -1:
                    logMessage(WARNING,"err...");
                    break;
                default:
                    /*事件就绪*/
                    logMessage(NORMAL,"Have a ready events...");
                    HandlerEvent(rfds);
                    break;
                }
            }
        }
    };
}

main.cc

代码语言:javascript
复制
#include "selectServer.hpp"
#include <memory>
using namespace std;
using namespace select_ns;

static void Usage(std::string proc)
{
    std::cerr<<"Usage:\n\t "<<proc<<"prort "<<std::endl;
}

std::string transaction(const std::string& request)
{
    return request;
}
// ./select_server 8081
int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    unique_ptr<SelectServer> srv(new SelectServer(transaction));
    srv->initServer();/*初始化服务器*/
    srv->start();/*启动服务器*/
    return 0;
}

封装套接字Sock.hpp:

代码语言:javascript
复制
#pragma once

#include<iostream>
#include<string>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include "err.hpp"
#include "log.hpp"

/*封装socket套接字*/
/*1.创建socket套接字
  2.进行套接字 端口号和ip进行绑定,绑定是服务器的
  3.让服务器进入监听状态
  4.最后接受客户端的连接请求,创建一个新的套接字进行与客户端进行通信*/
class Sock 
{
    const static int backlog = 32;
public:
    static int Socket()
    {
        //1.创建socket文件套接字对象,socket返回的是socket套接字即文件描述符
        //AF_INET表示IPV4协议,SOCKET_STREAM表示使用TCP协议
        int sock = socket(AF_INET,SOCK_STREAM,0);
        if(sock < 0)
        {
            logMessage(FATAL,"create socket error");
            exit(SOCKET_ERR);
        }
        logMessage(NORMAL,"create socket success: %d",sock);

        int opt = 1;
        //setsockopt函数,
        //SO_REUSEADDR选项允许在套接字处于TIME_WAIT状态(通常在套接字被关闭后出现)时,重新使用本地地址进行绑定。当服务器需要快速重启时
        //SO_REUSEPORT选项允许多个套接字在同一端口上同时绑定。这对于负载均衡和并发处理请求
        //SOL_SOCKET是用于setsockopt函数的第一个参数,它表示要设置选项的套接字级别
        setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
        return sock;
    }

    static void Bind(int sock,int port)
    {
        /*在这段代码中,结构体local绑定的IP地址和端口号是服务器的,而不是客户端的。
        local.sin_addr.s_addr = INADDR_ANY;将IP地址设置为INADDR_ANY,表示服务器可以通过任意可用的本地IP地址接受连接。这样,服务器可以监听所有本地网卡上的连接请求。
        local.sin_port=htons(port);是设置要绑定的端口号,该端口号指定了服务器将监听的特定端口。客户端将使用该端口号连接到服务器。
        */
        struct sockaddr_in local;/**/
        memset(&local,0,sizeof(local));
        local.sin_family = AF_INET;/*地址族:AF_INET表示ipv4*/
        local.sin_port=htons(port);/*端口号,主机字节序转换为网络字节序*/
        local.sin_addr.s_addr = INADDR_ANY;/*IP地址,NADDR_ANY表示任意可用的本地IP地址。*/
        if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
        {
            logMessage(FATAL,"bind socket error");
            exit(BIND_ERR);
        }
        logMessage(NORMAL,"bind socket success");
    }

    static void Listen(int sock)
    {
        if(listen(sock,backlog)<0)
        {
            logMessage(FATAL,"listen socket error");
            exit(LISTEN_ERR);
        }
        logMessage(NORMAL,"listen socket success");
    }

    static int Accept(int listensock,std::string *clientip,uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock = accept(listensock,(struct sockaddr*)&peer,&len);
        if(sock<0)
        {
            logMessage(FATAL,"accept error,next");
        }
        else
        {
            logMessage(NORMAL,"accept a new link success,get new sock: %d",sock);
            *clientip = inet_ntoa(peer.sin_addr);
            *clientport = ntohs(peer.sin_port);
        }
        return sock;
    }

};

日志类log.hpp:

代码语言:javascript
复制
#pragma once
#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<unistd.h>

#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4

const char * to_levelstr(int level)
{
    switch(level)
    {
        case DEBUG : return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARNING : return "WARNING";
        case ERROR : return "ERROR";
        case FATAL : return "FATAL";
        default : return nullptr;
    }
}

void logMessage(int level,const char *format,...)
{
#define NUM 1024
    char logprefix[NUM];
    snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid: %d]",to_levelstr(level),(long long)time(nullptr),getpid());
    char logcontent[NUM];
    va_list arg;
    va_start(arg,format);
    vsnprintf(logcontent,sizeof(logcontent),format,arg);
    std::cout<<logprefix<<logcontent<<std::endl;
}

错误码的设置err.hpp:

代码语言:javascript
复制
#pragma once

#include<iostream>

enum
{
    USAGE_ERR = 1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR
};

select的特点/缺点

  • select能够同时等待的文件描述符的数量是有限的。因为select采用fd_set类型的位图结构来得到需要等待的文件描述,fd_set的大小已经被确定了。因此是无法让程序员自定义数组的大小。
  • 必须借助第三方数组来维护文件描述符。在上面的示例代码中可以发现,当监听套接字_listensock的事件就绪,可以通过accept来获取客户端的请求连接后,我们得到了一个用于通信的套接字sock,此时这个sock不一定是事件就绪了,比如客户端成功与服务器建立连接后,就是没有了后续的操作,此时我们就需要将sock交给select去等待事件就绪,因此需要放在第三方数组中。
  • select的参数大部分都是输入输出型的,每次调用select之前都需要重新设置位图,重新放置文件描述符进去,更新fd,这就需要多次遍历数组。并且需要频繁的内核态和用户态的相互转换,拷贝有成本!

于是乎,poll出现了!poll相对于select来说,解决了select中两个问题:

  • 等待的文件描述符数量有限的问题。
  • 每次调用select都需要重新设置需要等待的文件描述符。

I/O多路转接之poll

分享流程:了解poll的接口,从而了解poll相对于select的好处,接着将上面的基于select的tcp服务器的代码改编成基于poll的服务器,最后总结一下poll的缺点。

poll函数原型

代码语言:javascript
复制
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

// pollfd结构
struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events */
    short revents; /* returned events */
};

参数解释:

①struct pollfd* fds:可以看成是一个动态开辟的数组。从struct pollfd结构中可以看出来,成员有fd、events和revents。

  • fd:需要poll去等待的文件描述符。
  • events:用户告诉内核,该文件描述符的需要等待的事件。(读事件、写事件和异常)
  • revents:内核告诉用户,该文件描述符的需要等待的事件就绪。

这个参数的设计,就可以做到了输入输出分离,不需要像select那样内核和用户共用一个位图去修改位图,因此在调用poll的时候,不需要重新设置文件描述符。并且做到了解决等待文件描述符数量上限的问题,因为fds数组的大小我们可以自己定义大小!

events和revents的取值:

② nfds_t nfds:fds的大小。

③int timeout:时间单位是ms。

  • timeeout > 0:在timeout时间内阻塞,时间过了以后非阻塞
  • timeout ==0:非阻塞等待
  • timeout < 0:阻塞等待

函数返回值

返回值小于0, 表示出错。 返回值等于0, 表示poll函数等待超时。 返回值大于0, 表示poll由于监听的文件描述符就绪而返回。

代码实现简单基于poll的tcp服务器 

代码思路:

与select类似。

实现代码:

pollServer.hpp

代码语言:javascript
复制
#include "Sock.hpp"
#include<poll.h>
#include <iostream>
#include <functional>
#include <string>

namespace poll_ns
{
    static const int defaultport = 8080;/*默认端口号*/
    static const int fd_num = 1024;/*可以等待的最大文件描述符数量*/
    static const int defaultval = -1;
    using func_t = std::function<std::string(const std::string&)>;

    class PollServer
    {
    private:
        int _listensock;
        int _port;
        struct pollfd* _rfds;
        func_t _func;

    public:
        PollServer(func_t f,int port = defaultport)
            :_port(port)
            ,_listensock(-1)
            ,_rfds(nullptr)
            ,_func(f)
        {}
        ~PollServer()
        {
            if(_listensock!=defaultport) close(_listensock);
            delete[] _rfds;
        }
        void initServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock,_port);
            Sock::Listen(_listensock);
            _rfds = new struct pollfd[fd_num];

            /*初始化*/
            for(int i = 0;i<fd_num;++i)
            {
                _rfds[i].fd = defaultval;
                _rfds[i].events = 0;
                _rfds[i].revents = 0;
            }
            _rfds[0].fd = _listensock;
            _rfds[0].events = POLLIN;
            _rfds[0].revents = 0;
        }
        void Accepter(int listensock)
        {
            logMessage(DEBUG, "Accepter in");
            std::string clientip;
            uint16_t clientport;
            int sock = Sock::Accept(listensock,&clientip,&clientport);
            if(sock < 0) return;
            logMessage(NORMAL,"accept success [%s:%d]",clientip.c_str(),clientport);
            int i = 0;
            /*将sock交给poll去等待事件就绪*/
            for(;i<fd_num;++i)
            {
                if(_rfds[i].fd!=defaultval) continue;
                else break;
            }
            if(i==fd_num)
            {
                logMessage(WARNING,"server is full,please wait!");
                close(sock);
            }
            else
            {
                _rfds[i].fd = sock;
                _rfds[i].events = POLLIN;
                _rfds[i].revents = 0;
            }
            logMessage(DEBUG, "Accepter out");
        }
        void Recver(int pos)
        {
            /*用于通信的套接字到这已经就绪了*/
            char buffer[1024];
            ssize_t s = recv(_rfds[pos].fd,buffer,sizeof(buffer)-1,0);
            if(s > 0)
            {
                buffer[s] = 0;
                logMessage(NORMAL,"client#  %s",buffer);
            }
            else if(s == 0)
            {
                close(_rfds[pos].fd);
                _rfds[pos].fd = defaultval;
                _rfds[pos].events = 0;
                _rfds[pos].revents = 0;
                logMessage(NORMAL,"client quit");
                return;
            }
            else
            {
                close(_rfds[pos].fd);
                _rfds[pos].fd = defaultval;
                _rfds[pos].events = 0;
                _rfds[pos].revents = 0;
                logMessage(NORMAL,"client quit");
                return;
            }
            std::string response = _func(buffer);
            write(_rfds[pos].fd,response.c_str(),response.size());
        }
        void HandlerEvent()
        {
            for(int i = 0;i<fd_num;++i)
            {
                if(_rfds[i].fd==defaultval) continue;
                if(_rfds[i].events!=POLLIN) continue;
                if((_rfds[i].fd==_listensock) && (_rfds[i].revents & POLLIN))
                {
                    Accepter(_rfds[i].fd);
                }
                else if(_rfds[i].revents & POLLIN)
                {
                    Recver(i);
                }

            }
        }
        void start()
        {
            int timeout = 3000;/*3000ms*/
            for(;;)
            {
                int n = poll(_rfds,fd_num,timeout);
                switch(n)
                {
                case 0:
                    logMessage(NORMAL,"timeout...");
                    break;
                case -1:
                    logMessage(WARNING,"err...");
                    break;
                default:
                    //有事件就绪了
                    logMessage(NORMAL,"Have a ready events...");
                    HandlerEvent();
                    break;
                }
            }

        }


    };
}

main.cc

代码语言:javascript
复制
#include "pollServer.hpp"
#include<memory>
using namespace poll_ns;

std::string transaction(const std::string& request)
{
    return "server accepted: "+request;
}

// ./select_server 8080
int main(int agrc,char* agrv[])
{
    std::unique_ptr<PollServer> srv(new PollServer(transaction));
    srv->initServer();
    srv->start();

    return 0;
}

poll主要的问题,还是遍历的问题。需要在设置需要关心的文件描述符以及事件的时候,需要遍历一下数据fds。

总结一下poll的优缺点 

优点:不同与select使用三个位图来表示三个fdset的方式, poll使用一个pollfd的指针实现。

  • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。接口使用比select更方便。
  • poll并没有最大数量限制 (但是数量过大后性能也是会下降)。

缺点:poll中监听的文件描述符数目增多时

  • 和select函数一样, poll返回后,需要轮询pollfd来获取就绪的描述符。
  • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中。
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降。

I/O多路转接之epoll

分享流程:先初识epoll,然后了解一下epoll的接口,然后结合接口调用,理解epoll的原理,接着基于epoll实现简单的TCP服务器。

初识epoll

按照man手册的说法: 是为处理大批量句柄而作了改进的poll。它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)。它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll的相关系统调用

epoll有三个系统调用,分别是:epoll_create()、epoll_ctl()、epoll_wait();

epoll_create

作用:创建一个epoll的句柄。这个句柄用完后,必须使用close关闭掉。至于这个句柄是什么,得结合原理来理解。

代码语言:javascript
复制
int epoll_create(int size);

参数size是一个整数,指定内核为epoll实例所分配的监听文件描述符数目的一个建议值。这个值在大多数情况下被忽略,而且现在已经不再具有实际意义,可以将其设置为任意值(大于0)。

epoll_ctl

作用:epoll_ctl的作用就是将需要等待事件就绪的文件描述符和需要等待的事件注册起来,让epoll去等待事件就绪。

代码语言:javascript
复制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  • 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型。
  • 第一个参数是epoll_create()的返回值(epoll的句柄)。
  • 第二个参数表示动作,用三个宏来表示。动作有:增、删、改。
  • 第三个参数是需要监听的fd.
  • 第四个参数是告诉内核需要监听什么事。

第二个参数的取值:

EPOLL_CTL_ADD :注册新的fd到epfd中; EPOLL_CTL_MOD :修改已经注册的fd的监听事件; EPOLL_CTL_DEL :从epfd中删除一个fd;

struct epoll_event结构如下:

events可以是以下几个宏的集合:

EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭); EPOLLOUT : 表示对应的文件描述符可以写; EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来); EPOLLERR : 表示对应的文件描述符发生错误; EPOLLHUP : 表示对应的文件描述符被挂断; EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的; EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里;

至于epoll_data_t结构体中的成员,可能会有人很疑惑为什么里面还有一个fd,epoll_ctl外面不也有一个fd了吗,这个得从代码演示中再讲解清楚!

epoll_wait

作用:将已经就绪的事件通知上层。

代码语言:javascript
复制
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  • 参数events是分配好的epoll_event结构体数组。
  • epoll将会把就绪的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size。
  • 参数timeout是超时时间 (毫秒, 0会立即返回, -1是永久阻塞)。
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败。

epoll的底层原理

在谈epoll的底层原理前,我们需要知道的一件事就是:操作系统怎么知道网络中有数据传来?学过机组的伙伴都知道了,是网卡通过向CPU发送中断信号,让CPU去中断向量表中找到相应的处理函数进行处理,通过驱动调用将数据从外设拷贝到内存中的OS内部。OK,打住!接下来我来讲讲epoll的模型。

实质上,在epoll的模型中,有一棵红黑树,这颗红黑树上的节点,以文件描述符为key值,并且存储了文件描述符需要等待的事件,红黑树左右孩子的指针,以及双向链表的向前指针和向后指针!这些值都存储在了一个叫epitem的结构体中。这棵红黑树的节点就是用户需要内核需要等待事件就绪的文件描述的集合!对应的接口就是epoll_ctl。在epoll模型中,我们在使用的时候,,该红黑树就对应着select和poll中的第三方数组,已经由内核去维护了!这是epoll与poll、select的区别之一。

代码语言:javascript
复制
struct epitem{
    struct rb_node rbn;//红黑树节点
    struct list_head rdllink;//双向链表节点
    struct epoll_filefd ffd; //事件句柄信息
    struct eventpoll *ep; //指向其所属的eventpoll对象
    struct epoll_event event; //期待发生的事件类型
}

在epoll模型中,还有一条就绪队列,该队列是一条双向链表,该队列的节点就是事件已经就绪的文件描述符fd!对应的接口就是epoll_wait()。对于epoll_wait(),是不需要遍历检测哪些文件描述符是合法的,是已经事件就绪的,因为在这个队列中,全都是合法的,全都是事件就绪的了!并且,epoll_wait()会将所有就绪的事件,按照升序放到用户传入的数组中。

那么问题来了,如何得知哪些文件描述符的事件就绪呢?怎么将已经就绪的文件描述符放到就绪队列中呢?答案如下:

首先,每一个节点对应着一个文件描述符fd,那么底层就会有一个struct file的文件操作对象,该对象中有一个缓冲区的字段,还有一个回调函数的指针。

当网卡发送中断信号,CPU处理中断处理函数后,网卡驱动会将数据拷贝到这个缓冲区中,接着调用相应的回调函数,将处于红黑树的对应的节点中的双向链表的向前指针和向后指针链入就绪队列中,至此,该文件描述符的事件就绪,请上层处理!

而在这整个epoll模型,就是使用epoll_create创建出来的,使用其epoll句柄进行操作!而这个epoll句柄是也是一个文件描述符,这个文件描述符会在进程PCB中的文件描述符表中找到!在使用epoll_create的时候,会创建出一个eventpoll这个结构体,这个结构体里面有两个指针成员,一个指向就绪队列,一个指向红黑树。进程通过epoll句柄找到这个结构体,从而操作红黑树和就绪队列。

代码语言:javascript
复制
struct eventpoll
{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

原理图:

 代码实现简单基于epoll的TCP服务器

实现思路:

初始化服务器:在初始化服务器的时候,将服务器设置成监听状态,并且为用于接收事件就绪的文件描述符的数组开辟空间,接着创建epoll句柄。

启动服务器:使用epoll_wait获取事件就绪的文件描述符,并且处理该文件描述符。根据这个文件描述是监听套接字还是用于通信的套接字来进行相应的处理。

处理监听套接字:到了这一步,代表该监听套接字事件就绪,那么就可以进行获取客户端请求连接了,并且得到用于通信的套接字。将这个套接字添加到epoll中,记得设置其事件。

处理用于通信的套接字:到了这一步步,代表该套接字已经就绪,那么进行通信。

代码:
代码语言:javascript
复制
#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "Sock.hpp"

namespace epoll_ns
{
    static const int defaultport = 8888;
    static const int defaultval = -1;
    static const int size = 128;
    static const int defalultnum = 1024;
    using func_t = std::function<std::string(const std::string&)>;
    class EpollServer
    {
    private:
        int _listensock;/*监听套接字*/
        int _port;/*服务器端口号*/
        struct epoll_event* _revs;/*用于存放事件已经就绪的文件描述符*/
        int _epollfd;/*epoll句柄*/
        int _num;/*数组的大小*/
        func_t _func;/*处理响应的内容的函数*/
    public:
        EpollServer(func_t f,int num = defalultnum,int port = defaultport)
            :_port(port)
            ,_listensock(defaultval)
            ,_revs(nullptr)
            ,_epollfd(defaultval)
            ,_num(num)
            ,_func(f)
        {

        }
        ~EpollServer()
        {
            if(_listensock!=defaultval) close(_listensock);
            if(_epollfd!=defaultval) close(_epollfd);
            if (_revs)
                delete[] _revs;
        }

        void initServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock,_port);
            Sock::Listen(_listensock);
            /*申请事件就绪空间*/
            _revs = new struct epoll_event[_num];

            /*创建epoll句柄*/
            _epollfd = epoll_create(size);
            if (_epollfd < 0)
            {
                logMessage(FATAL, "epoll create error: %s", strerror(errno));
                exit(EPOLL_CREATE_ERR);
            }

            /*将_listensock添加到epoll中注册起来*/
            struct epoll_event ev;/*创建出epoll_event的结构体,存储需要等待的文件描述符和事件*/
            ev.events = EPOLLIN;/*需要等待的事件*/
            ev.data.fd = _listensock;/*需要等待的描述符*/
            epoll_ctl(_epollfd,EPOLL_CTL_ADD,_listensock,&ev);/*注册起来,在底层就是挂在了红黑树上*/
            logMessage(NORMAL, "init server success");
        }
        void HandlerEvent(int readyNum)
        {
            for(int i = 0;i<readyNum;++i)
            {
                uint32_t events = _revs[i].events;
                int sock = _revs[i].data.fd;//这里可以说明了,为什么在epoll_ctl的参数中有fd了,而结构体里还需要再填入fd,因为需要提出来处理事件
                if(sock==_listensock && (events & EPOLLIN))/*监听套接字*/
                {   
                    std::string clientip;
                    uint16_t clientport;
                    int fd = Sock::Accept(sock,&clientip,&clientport);
                    if (fd < 0)
                    {
                        logMessage(WARNING, "accept error");
                        continue;
                    }
                    /*将用于通信的套接字fd添加到epoll中,也就是挂到红黑树中*/
                    struct epoll_event ev;
                    ev.events = EPOLLIN;
                    ev.data.fd = fd;
                    epoll_ctl(_epollfd,EPOLL_CTL_ADD,fd,&ev);/*添加*/
                    
                }
                else if(events & EPOLLIN)/*用于通信的套接字*/
                {
                    /*这里仅仅是为了演示epoll,这里接收数据的操作是不正确的,因为不能保证数据的完整性*/
                    char buffer[1024];
                    ssize_t s = recv(sock,buffer,sizeof(buffer)-1,0);
                    if(s > 0)/*数据接收完毕*/
                    {
                        buffer[s] =0;
                        logMessage(DEBUG, "client# %s", buffer);
                        // TODO
                        std::string response = _func(buffer);
                        send(sock, response.c_str(), response.size(), 0);
                    }
                    else if(s==0)/*客户端断开连接*/
                    {
                        // 建议先从epoll移除,才close fd
                        epoll_ctl(_epollfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(NORMAL, "client quit");
                    }
                    else  /*读取失败*/
                    {
                        // 建议先从epoll移除,才close fd
                        epoll_ctl(_epollfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));
                    }
                }
                else
                {}
                logMessage(DEBUG, "HandlerEvent out");
            }
        }
        void start()
        {
            int timeout = -1;/*阻塞*/
            for( ; ; )
            {
                /*获取事件就绪的文件描述符*/
                int n = epoll_wait(_epollfd,_revs,_num,timeout);
                switch(n)
                {
                case 0:
                    logMessage(NORMAL, "timeout ...");
                    break;
                case -1:
                    logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
                    break;
                default:
                    logMessage(NORMAL, "have event ready");
                    HandlerEvent(n);
                    break;
                }
            }
        }
    };
}

总结epoll、poll和select的区别

关于epoll,还没结束,但我们先将epoll、poll和select进行总结,再往后走吧!

首先先列出它们三者的特点:

select的特点:

上面已经说过,看到这那就复习一遍吧!

  • select能够同时等待的文件描述符的数量是有限的。因为select采用fd_set类型的位图结构来得到需要等待的文件描述,fd_set的大小已经被确定了。因此是无法让程序员自定义数组的大小。
  • 必须借助第三方数组来维护文件描述符。在上面的示例代码中可以发现,当监听套接字_listensock的事件就绪,可以通过accept来获取客户端的请求连接后,我们得到了一个用于通信的套接字sock,此时这个sock不一定是事件就绪了,比如客户端成功与服务器建立连接后,就是没有了后续的操作,此时我们就需要将sock交给select去等待事件就绪,因此需要放在第三方数组中。
  • select的参数大部分都是输入输出型的,每次调用select之前都需要重新设置位图,重新放置文件描述符进去,更新fd,这就需要多次遍历数组。并且需要频繁的内核态和用户态的相互转换,拷贝有成本!
poll的特点:

  • 解决了select中等待文件描述符数量上限的问题。
  • 遍历问题。
epoll的优点:

epoll的优点是与select的缺点对应的

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开。
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响。
  • 没有数量限制: 文件描述符数目无上限。

需要注意的是,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的。

总结它们三者之前的优缺点:

  • 在可以等待的文件描述符的数量上:select是有上限的,取决于它的位图的类型fd_set的大小。poll跟poll是没有上限的。
  • 在设置需要等待的文件描述符这个操作上面:每次调用select都需要重新将需要等待的文件描述符设置到位图中,因为这个参数是输入输出型,用户在传入这个位图来告诉内核需要等待的文件描述符,当事件就绪,内核就会修改这个位图,来通知用户哪些文件描述已经就绪。而poll和epoll是不需要重新设置的,因为在epoll和poll中,这两个操作是分离开的。
  • 数据拷贝的问题:select和poll都需要将大量的fd从用户态拷贝到内核中,而epoll则是在进行EPOLL_CTL_ADD的时候,才会进行拷贝,因此epoll的拷贝相对于较少。
  • 遍历事件就绪的文件描述符的问题:select和poll都需要遍历一下,哪些文件描述符已经就绪了,而epoll不需要,因为使用epoll_wait可以直接从就绪队列中获取已经事件就绪的文件描述符,时间复杂度为O(1)。

epoll的ET模式和LT模式

当IO就绪,意味着epoll需要通知上层处理事件,那么在通知机制上,有两种机制,分别是ET模式和LT模式。

LT模式,即水平触发Level Triggered 工作模式。在这个模式下,epoll会不断通知用户去处理,只要用户还没处理完,或者没去处理,epoll就会不断通知,比如读取数据的时候,没读完,那么epoll就会不断地催促着用户去读取,直到读完。在这个模式下,文件描述符可以是阻塞的,也可以是非阻塞的。

ET模式,即边缘触发Edge Triggered工作模式。在这个模式下,epoll只会通知一次,就不再通知用户了,除非底层来了新的数据,才会再通知一次!在ET模式下,文件描述符必须是非阻塞的

假设ET模式下的文件描述符是阻塞的,而ET模式会“逼迫”用户将数据一次读完,而如果我们不站在上帝视角,接收端是不知道数据有多少的,因此就需要循环读取!如果fd是阻塞的,在读取一次后,没读取完,而发送端需要接收端发送ACK,发送端需要读取完才能发送,那么就进程就会阻塞住了!因此,ET模式下,fd必须是非阻塞的,才能循环读取,直到数据读取完毕!

ET模式下epoll高效的原因

从ET模式下,会逼迫用户一次性读取完数据这个点上,我们可以推出以下机制:

让用户一次性读取完数据----->说明TCP可以给接收方提供更大的窗口大小!----->发送方有更大的滑动窗口-------->从而提供更高的数据发送效率,更好地利用延迟应答等机制,从而高效地进行网络通信!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多路转接
  • I/O多路转接之select
    • 了解select的基础理论
      • 什么是select?
      • select函数原型
      • 理解select执行过程
    • 代码简单实现基于select的tcp服务器
      • 代码思路:
      • 实现代码:
    • select的特点/缺点
    • I/O多路转接之poll
      • poll函数原型
        • 代码实现简单基于poll的tcp服务器 
          • 代码思路:
          • 实现代码:
          • 总结一下poll的优缺点 
      • I/O多路转接之epoll
        • 初识epoll
          • epoll的相关系统调用
            • epoll_create
            • epoll_ctl
            • epoll_wait
          • epoll的底层原理
            •  代码实现简单基于epoll的TCP服务器
              • 实现思路:
              • 代码:
          • 总结epoll、poll和select的区别
            • 首先先列出它们三者的特点:
              • select的特点:
              • poll的特点:
              • epoll的优点:
            • 总结它们三者之前的优缺点:
            • epoll的ET模式和LT模式
              • ET模式下epoll高效的原因
              相关产品与服务
              负载均衡
              负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档