IO多路复用的本质是使用一个执行流同时等待多个文件描述符就绪。它解决了阻塞IO中“一个连接需要一个线程”导致的资源消耗过大问题,也解决了非阻塞IO需要不断轮询导致的CPU占用率高的问题。epoll被公认为linux2.6下性能最好的多路IO就绪通知实现方式。
注:接下来的接口讲解只是初步语法了解,也许你会听得一头雾水,不知道这些接口,参数,返回值都是干嘛的,没关系在后文讲解原理后会回头解释各个接口都是做什么。
select/poll都是用一个接口完成的,而epoll是3个接口完成。
具体使用方法可以使用man手册查看,如下:
man epoll
man epoll_create
man epoll_ctl
man epoll_wait头文件:#include <sys/epoll.h>
接口原型:
int epoll_create(int size);接口原型:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epfd:epoll模型对应的文件描述符(即epoll_create的返回值)。
op:需要执行的操作(增、删、改等),如下:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除⼀个fd;
它们本质是宏值,内核源码:

event:传入需要关心的文件描述符和事件,是一个struct epoll_event类型的地址。
关于struct epoll_event类型:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events:表示要关心的事件,可以是以下⼏个宏的集合:
EPOLLIN:表⽰对应的⽂件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表⽰对应的⽂件描述符可以写;
EPOLLPRI:表⽰对应的⽂件描述符有紧急的数据可读(这⾥应该表⽰有带外数据到来);
EPOLLERR:表⽰对应的⽂件描述符发⽣错误;
EPOLLHUP:表⽰对应的⽂件描述符被挂断;
EPOLLET:将epoll设为边缘触发(Edge Triggered)模式,这是相对于⽔平触发(Level Triggered) 来说的。
EPOLLONESHOT:只监听⼀次事件,当监听完这次事件之,如果还需要继续监听这个socket的话,需要再次把这个socket加⼊到epoll模型中。
data:通常只使用fd成员,标记需要关心的文件描述符。内核不会对data字段进行修改。
接口原型:
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);epfd:同上,epoll模型对应的文件描述符(即epoll_create的返回值)。events:输出型参数,一个struct epoll_event类型的数组首地址。(和上一个接口不同,上一个接口传的是单个元素的地址)内核会把已就绪事件的struct epoll_event从0下标开始依次存到该数组中。maxevent:输出型参数,内核告诉用户这个数组有多长,方便用户遍历。timeout:同poll,输出型参数,告诉内核阻塞等待的时间,单位是毫秒。返回值的作用同select/poll,如下:
与select相比poll在使用上的优点在于分离了输入输出文件描述符集,让它们用两个参数来管理。而epoll更是把从接口上分离了关心文件描述符的设置和就绪文件描述符的获取。(解耦的强度越来越高)
epoll模型在底层有这样一颗红黑树,储存了文件描述符信息和文件需要关心的事件信息,此外还维护了一个就绪队列,内核会将已就绪的文件信息添加到就绪队列中。
举一个不恰当的示意图:

除此之外在底层还存在一种回调机制,在传输层被触发,但底层有数据就绪,会自动进行回调,激活红黑树中的节点,激活到就绪队列中。所以在次之前特定的fd是注册过回调机制的。

对比select/poll,epoll的优势有那些?💡
注意1:epoll中检测文件描述符是否就绪的复杂度是O(1),但取出已就绪的文件描述符的时间复杂度是O(N),这是避免不了的。
注意2:检测fd就绪,回调激活节点等操作都是OS自动完成的。
各个接口在底层都在做什么工作?
epoll_create:创建红黑树、就绪队列、回调机制。epoll_ctl:对红黑树进行增删改。(其中增加事件管理的本质:创建红黑树节点和注册回调机制)epoll_wait:访问就绪队列,检查是否有文件就绪,如果有从就绪队列中取出。就绪队列的理解?
epoll_create返回的epfd是什么?为什么后面的接口都需要这个参数?epfd本质是文件描述符。后面的接口需要通过这个这个文件描述符找到epoll模型。 epoll模型怎么和文件系统扯上关系的?其实在内核中epfd文件描述符指向的文件信息是epoll模型的地址。
注:接下来解析的是内核版本是linux-2.6.18。
红黑树与就绪队列:


文件描述符epfd的作用:
epoll模型是怎么和文件系统关联的?
在文件描述符对应的 struct file结构体中,存在一个 private_data指针。调用 epoll_create后,系统会创建一个 epoll 模型(开辟一块内核空间),并分配一个文件描述符 epfd;同时,将 epfd 对应的 struct file 中的 private_data 指针,指向该 epoll 模型的内核地址。
epoll_ctl和epoll_wait就能够拿着文件描述符epfd找到文件信息private_data,进而找到epoll模型.
内核源码如下:


epoll_ctl函数的底层逻辑:

当我们进行EPOLL_CTL_ADD操作(即添加需要关心的文件描述符和事件)时,会调用ep_insert函数,目的是创建一个epitem节点并完成初始化连接到红黑树。
接下来进入ep_insert函数:

回调机制的注册:

回调机制的激活: 底层数据就绪是怎么使epoll的回调机制激活呢?我们来看内核源码:

以读事件就绪为例,在socket套接字中的sock成员里有接收队列(即接收缓冲区),当里面有数据到来时,函数sk_data_ready会被调用,而该函数是在accept时被初始化的。
如上图被初始化为sock_def_readable,接下来我们进入这个函数:

如上该函数最终回调了函数func,而func是wait_queue_t(struct __wait_queue)的成员, 而epoll的回调函数最终是被设置到func中的,如下:

socket的底层有一个回调队列,当缓冲区有数据,就会遍历该队列一一进行函数回调:

初始化部分:
#define MAXEVENTS 128
class TcpServer
{
public:
TcpServer()
{
//完成初始化(创建套接字、绑定端口、启动监听)...
//......
//创建epoll模型
_epfd = epoll_create(0);
//组织一个epoll_event变量,设置文件描述符和要关心的事件
epoll_event event;
event.data.fd = _listenfd;
event.events = EPOLLIN;
//将该文件信息设置到内核里
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listenfd, &event);
}
private:
int _listenfd;
int _epfd;
struct epoll_event _events[MAXEVENTS];
//其他成员......
}; 注意这里监听描述符_listenfd也是文件描述符,需要我们用epoll进行管理,而不是直接accept。
服务器在刚启动时,默认只有一个fd,accept本质是阻塞IO。只不过不是用来传输数据的,而它关心的是_listenfd的读事件。我们需要将_listenfd添加到epoll模型中,让epoll帮我关心读事件就绪。
程序启动后,我们直接通过epoll_wait 检查就绪队列,从中获取就绪事件:
void Start()
{
while (true)
{
int tm = 1000; // 阻塞等待1000毫秒(也设置为-1永久阻塞,或0立即返回)
int n = epoll_wait(_epfd, _events, MAXEVENTS, tm);
// 函数执行错误
if (n < 0)
{
std::cout << "epoll_wait fail\n"<< std::endl;
exit(1);
}
// 超时处理
else if (n == 0)
{
std::cout << "overtime..." << std::endl;
}
// 事件派发
else
{
std::cout << "事件就绪..." << std::endl;
HandlerEvent(n);
}
}
}新连接到来依旧不能直接recv,应该把新的fd添加到epoll模型,直接调用epoll_ctl即可,比select/poll简单得多:
_listenfd读事件就绪处理函数:
void SolveAccept()
{
//accept获取到普通fd,记为userfd
//.....
//设置到epoll模型中
epoll_event event;
event.data.fd = userfd;
event.events = EPOLLIN;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, userfd, &event);
}当调用SolveRead时说明有普通文件描述符就绪,读一次不会被阻塞。客户端退出或读取异常,需要从epoll中移除fd的关心和关闭fd。epoll_ctl只能移除合法fd,所以需要先移除后关闭。
普通文件读事件就绪处理函数:
void SolveRead(int userfd)
{
char buffer[1024];
int n = recv(userfd, buffer, sizeof(buffer) - 1, 0);
if (n <= 0)
{
//recv返回值为0,用户断开连接;recv返回值为小于0,读取异常。
//这两种情况需要将userfd从内核中移除再关闭该文件。
epoll_ctl(_epfd, EPOLL_CTL_DEL, userfd, nullptr);
close(userfd);
}
else
{
buffer[n] = '\0';
cout << "client say@: " << buffer << endl;
}
}由此可见,epoll虽然内核原理复杂但使用起来比select/poll简单得多得多。
epoll优点:
1. 高效:O(1)复杂度检测就绪事件,无需遍历所有fd。 2. 无上限:管理的fd数量仅受系统资源限制。 3. 用户态使用简便:无需每次调用都传递fd集合。 4. 内核态优化:采用回调机制而非轮询,避免无效开销。
epoll与select/poll相同,有就绪事件不处理,内核就会一直通知用户,epoll_wait调用只是检测是否有就绪事件并把就绪事件信息从就绪队列中拿出来。并没有把底层数据获取上来,会一直被激活(被再次添加到就绪队列里),这称为LT模式(水平触发模式),是epoll的默认模式。
select/poll没有模式选择,默认只有LT模式,而epoll作为多路转接的最佳实践有两种模式提供选择,即LT和ET。ET模式又称为边缘触发模式,数据就绪只通知一次,只有数据变更时才会再次被激活。
怎么理解只通知一次? 通知指的是调用epoll_wait时,内核会检查就绪队列是否有节点,如果有会把所有文件信息带出到用户态(就绪队列被清空)。如果用户不对取出来文件的就绪事件处理,那么该文件就一直处于就绪状态,但内核不会再次把它激活到就绪队列中。
这两种模式中ET模式效率更高,因为它的有效通知更多。
问题:要把数据取完就只能循环读取,而不是只读一次,那么怎么知道数据已经被读取完呢?数据读完底层没数据程序被阻塞怎么办?
为解决这些问题我们只需要把fd设置为非阻塞即可。而且ET模式下被关心的fd必须设置为非阻塞状态。设为⾮阻塞IO后,如果内核还未将数据准备好,系统调⽤仍然会直接返回。然后退出循环即可。
以上其实是一个很常见的面试题:为什么ET模式必须工作在非阻塞模式下?
LT模式需要设置为非阻塞吗? 可设,可不设。
事实上在LT模式下,程序员如果能把就绪事件一次性处理完(设为非阻塞),那么LT模式和ET模式效率和效果没有任何区别。那么ET模式存在的意义何在? ET模式的意义是增加确定性,ET模式能保证被关心的 文件fd 100%是非阻塞的(是强制要求程序员的,不这么写就会出错)。而LT模式就很随性,没约束,不能保证被关心的 文件fd 都是非阻塞。
为什么要增加确定性? 在底层接收缓冲区剩余空间越大,能给对端通告更大窗口,对端可以发更多数据,提高效率。ET模式倒逼程序员的本质:为了给对端在概率上提供更大的win窗口,提高TCP传输效率。
接收缓冲区存在低水位线(假设为 100 字节),若缓冲区中数据量低于该值,epoll 模型不会触发读事件通知(避免频繁系统调用,降低开销)。而客户端把报文设置标志位PSH的本质:要求对端只要接收缓冲区有数据就通知上层(PSH仅仅是让fd就绪)。
注意:ET模式不是以整个epoll模型为单位设置的,而是以单fd为单位。
接下来编写程序,同样的只展示epoll相关的核心代码,我们要完成ET模式下的读事件关心和写事件关心。
在初始化时要成为模型的创建,并且把监听文件描述符_listenfd设为非阻塞状态,并以关心读的方式设置到内核中,同时要设置为ET模式只需要将EPOLLET参数设置上,如下:
#define MAXEVENTS 128
class TcpServer
{
public:
TcpServer()
{
// 完成初始化(创建套接字、绑定端口、开启监听)...
//......
// 创建epoll模型
_epfd = epoll_create(0);
//把_listen设置为非阻塞
int ft = fcntl(_listenfd,F_GETFL);
fcntl(_listenfd,F_SETFL,ft|O_NONBLOCK);
// 组织一个epoll_event变量,设置文件描述符和要关心的事件
epoll_event event;
event.data.fd = _listenfd;
event.events = EPOLLIN | EPOLLET;//设置ET模式和关心读事件
// 将该文件信息设置到内核里
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listenfd, &event);
}
private:
int _listenfd;
int _epfd;
struct epoll_event _events[MAXEVENTS];
// 其他成员......
};注意:LT模式没有单独的宏参数,没有ET参数就是LT模式,有ET参数就是ET模式。
在服务器启动Start函数时,以循环的方式取检测就绪队列里有没有文件就绪,在客户端fd到来是我们把它设置为关心读就绪和写就绪,所以在检测时需要同时检测两种就绪情况(注意不可使用if else,因为一个fd可能同时读和写同时就绪,都要处理)如下:
void Start()
{
while (true)
{
int tm = 1000;//阻塞等待1000毫秒
int n = epoll_wait(_epfd, _events, MAXEVENTS, tm);
if (n > 0)
{
for (int i = 0; i < n; i++)
{
int fd = _events[i].data.fd;
uint32_t event = _events[i].events;
//如果有错误,把event都设为读写就绪,让它们在函数内处理
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if(fd == _listenfd)
SolveAccept();//_listenfd读事件处理
else
Recver(fd);//普通文件读事件处理
}
if (event & EPOLLOUT)
{
Sender(fd);//普通文件读事件处理
}
}
}
// 超时处理
else if (n == 0)
{
std::cout << "overtime..." << std::endl;
}
// 函数执行错误
else
{
std::cout << "epoll_wait fail\n" << std::endl;
exit(-1);
}
}
} SolveAccept函数在处理新连接时注意把fd设置为非阻塞,再将它设置为ET模式并关心读写事件。由于默认发送缓冲区是空的(即写事件就绪),这里可以直接调用Recver,也可以让epoll模型后面触发。
核心代码如下:
void SolveAccept()
{
// accept获取到普通文件描述符记为userfd......
//把_listen设置为非阻塞
int ft = fcntl(userfd,F_GETFL);
fcntl(userfd,F_SETFL,ft|O_NONBLOCK);
// 组织一个epoll_event变量,把文件描述符和要关心的事件设置到内核
epoll_event event;
event.data.fd = userfd;
event.events = EPOLLET | EPOLLIN | EPOLLOUT; // 设为ET模式和关心读写事件
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, userfd, &event);
//默认写事件是就绪的,所以这里可以直接调Recver,也可以让epoll模型触发。
//......
}读事件处理如下: 注意:要考虑非阻塞IO返回值处理问题
void Recver(int userfd)
{
char buffer[1024];
while (true)
{
// 循环读取数据
buffer[0] = 0;
int n = recv(userfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
// 读取成功,数据处理......
}
else if (n == 0)
{
// recv返回值为0,说明用户断开连接,需要将userfd从内核中移除再关闭该文件。
epoll_ctl(_epfd, EPOLL_CTL_DEL, userfd, nullptr);
close(userfd);
break;
}
else
{
// 如果因为底层没数据了,退出循环
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
// 如果因为处理信号被打断,进行continue
else if (errno == EINTR)
continue;
else
{
// 异常处理......
return;
}
}
}
}写事件处理同理,如下:
void Sender(int userfd)
{
while (true)
{
// 发送数据
int n = send(userfd, /*......*/);
if (n > 0)
{
// 发送成功处理......
}
else if (n == 0)
{
// 返回 0 说明操作成功完成,但请求发送的字节数为0(即没有数据需要发送了)
break;
}
else
{
// 如果写事件未就绪(即发送缓冲区满了)则退出循环。
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
// 如果因为处理信号被打断则进行continue
else if (errno == EINTR)
continue;
else
{
// 异常处理......
return;
}
}
}
}非常感谢您能耐心读完这篇文章。倘若您从中有所收获,还望多多支持呀!