在Linux中,常见的多路转接/复用有 select
、poll
和 epoll
。
多路转接的核心作用就是:对多个文件描述符进行等待(手段),通知上层哪些文件描述符已经就绪,本质是一种对IO事件就绪的通知机制。
select
函数原型:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
参数:
nfds
是需要监视的最大文件描述符值+1readfds、writefds、exceptfds
分别对应需要检测的可读、可写、异常文件描述符的集合(这三个参数是输入输出型参数,每次调用select,都要对输入参数重新设置。)timeout
为结构 timeval
, 用来设置 select()
的等待时间timeval
结构:
struct timeval {
time_t tv_sec; // 秒
suseconds_t tv_usec; // 微秒
};
timeval
结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回, 返回值为 0。
参数
timeout
取值:
NULL
: 表示 select()
没有 timeout
,select
将一直被阻塞,直到某个文件描述符上发生了事件{0,0}
(非阻塞等待): 仅检测描述符集合的状态, 然后立即返回, 并不等待外部事件的发生函数返回值:
n > 0
:就绪了多少个fdn == 0
:底层没有fd就绪,超时了,没有返回n < 0
:select等待失败了关于
select
:
fd_set
就是一个 struct+数组 的位图fd_set
是一种具体的数据类型,大小是固定的,所以 fd_set
能够包含的fd的个数是有上限的,即 select
能管理的fd个数是有上限的fd_set
中的每一个比特位对应一个文件描述符 fd,则 1 字节长的 fd_set
最大可以对应 8 个文件描述符操作 fd_set
的接口:
void FD_CLR(int fd, fd_set *set); // 清除描述词组set中相关fd的位
int FD_ISSET(int fd, fd_set *set); // 测试描述词组set中相关fd的位是否为真
void FD_SET(int fd, fd_set *set); // 设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 清除描述词组set的全部位
select
核心代码:
fd_set rfds;
FD_SET(fd, &rfds);
select(fd + 1, &rfds, nullptr, nullptr, nullptr);
if(FD_ISSET(fd, rfds))
{...}
select
执行过程:
fd_set rfds
,FD_ZERO(&rfdst)
,则rfds
用位表示是 0000 0000FD_SET(fd, &rfds)
,后rfds
变为 0001 0000rfds
变为 0001 0011select(6, &rfds, 0, 0, 0)
阻塞等待rfds
变为0000 0011,而没有事件发生的 fd=5 被清空
select
的特点
select
可管理的fd有上限,可监控的文件描述符个数取决于 sizeof(fd_set) 的值。
select
的缺点:
select
都需要手动设置 fd 集合,不方便select
都需要在内核遍历传递进来的所有 fd,把 fd 集合从用户态拷贝到内核态, fd很多时开销很大select
支持的文件描述符数量太小select
使用示例:
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
#define NUM sizeof(fd_set) * 8
using namespace SocketModule;
using namespace LogModule;
const int gdefaultfd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port)
: _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
{
}
void Init()
{
_listen_socket->BuildTcpSocketMethod(_port);
// 初始化辅助数组
for (int i = 0; i < NUM; i++)
{
_fd_array[i] = gdefaultfd;
}
// 将listensockfd添加到数组中
_fd_array[0] = _listen_socket->Fd();
}
void Loop()
{
fd_set rfds; // 读文件描述符集
_isrunning = true;
while (_isrunning)
{
// 1.清空rfds
FD_ZERO(&rfds);
int maxfd = gdefaultfd;
// // 2.将listensockfd添加到rfds中
// FD_SET(_listen_socket->Fd(), &rfds);
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == gdefaultfd)
continue;
// 将合法的fd设置进入集合中
FD_SET(_fd_array[i], &rfds);
if (_fd_array[i] > maxfd)
{
maxfd = _fd_array[i];
}
}
// InetAddr client;
// // accept 是阻塞式的
// // 从listensockfd获取新链接,本质也是一种IO
// int newsockfd = _listen_socket->Accepter(&client);
struct timeval timeout = {10, 0};
// 我们不能让accept来阻塞检测链接到来,而应该让select负责进行就绪事件的检测
int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
switch (n)
{
case 0:
std::cout << "time out..." << std::endl;
break;
case -1:
perror("select");
break;
default:
std::cout << "有事件就绪了..., timeout: " << timeout.tv_sec << std::endl;
Dispatcher(rfds); // 把已经就绪的fd,派发给指定的模块
break;
}
}
_isrunning = false;
}
void Accepter()
{
InetAddr client;
// 不会阻塞了,也就是不用等了,只用拷贝就行
int newfd = _listen_socket->Accepter(&client);
if (newfd < 0)
return;
else
{
std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
// 把新的newfd添加到辅助数组中
int pos = -1;
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == gdefaultfd)
{
pos = i;
break;
}
}
if (pos == -1)
{
LOG(LogLevel::ERROR) << "服务器已经满了...";
close(newfd);
}
else
{
_fd_array[pos] = newfd;
}
}
}
void Recver(int who)
{
char buffer[1024];
ssize_t n = recv(_fd_array[who], buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client# " << buffer << std::endl;
std::string message = "echo# ";
message += buffer;
send(_fd_array[who], message.c_str(), message.size(), 0);
}
else if (n == 0)
{
LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fd_array[who];
close(_fd_array[who]);
_fd_array[who] = gdefaultfd;
}
else
{
LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fd_array[who];
close(_fd_array[who]);
_fd_array[who] = gdefaultfd;
}
}
void Dispatcher(fd_set &rfds)
{
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == gdefaultfd)
continue;
if (_fd_array[i] == _listen_socket->Fd())
{
// 是listensockfd,并且已经添加的集合中了
if (FD_ISSET(_fd_array[i], &rfds))
{
Accepter();
}
}
else
{
// 就绪的普通fd
if (FD_ISSET(_fd_array[i], &rfds))
{
Recver(i);
}
}
}
}
~SelectServer()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listen_socket;
bool _isrunning;
int _fd_array[NUM]; // 辅助数组
};
select
是一种通用、简便的I/O多路复用机制,适用于小规模连接和处理简单的并发场景。
和 select
一样,poll
的定位也是对多个 fd IO事件的等待机制,达到事件派发的目的。
fd + events
):用户告诉内核,你要帮我关心哪个fd上的哪些事件fd + revents
):内核告诉用户,你要关心的哪个fd上的哪些事件已经就绪了参数说明:
fds
是一个 poll 函数监听的结构列表,每一个元素中,包含了三部分内容:文件描
述符,监听的事件集合,返回的事件集合nfds
表示 fds 数组的长度(也就是说poll支持的文件描述符理论上没有上限)timeout
表示 poll 函数的超时时间,单位是毫秒。>0
:timeout 时间内阻塞式,==0
非阻塞,<0
阻塞式。返回值:
< 0
:出错 == 0
:等待超时 > 0
:有n个 fd 就绪
poll
解决了select
两个最主要的问题:1、支持的文件描述符有限,2、每次调用都 要重新设置参数。
poll
使用示例:
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <poll.h>
#include "Log.hpp"
#include "Socket.hpp"
#define MAX 4096
using namespace SocketModule;
using namespace LogModule;
const int gdefaultfd = -1;
class PollServer
{
public:
PollServer(uint16_t port)
: _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
{
}
void Init()
{
_listen_socket->BuildTcpSocketMethod(_port);
// 初始化结构体数组
for (int i = 0; i < MAX; i++)
{
_fds[i].fd = gdefaultfd;
_fds[i].events = 0;
_fds[i].revents = 0;
}
// 将listensockfd添加到数组中
_fds[0].fd = _listen_socket->Fd();
_fds[0].events |= POLLIN;
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
int timeout = 1000;
int n = poll(_fds, MAX, timeout);
switch (n)
{
case 0:
std::cout << "time out..." << std::endl;
break;
case -1:
perror("select");
break;
default:
std::cout << "有事件就绪了..." << std::endl;
Dispatcher(); // 把已经就绪的fd,派发给指定的模块
break;
}
}
_isrunning = false;
}
void Accepter()
{
InetAddr client;
// 不会阻塞了,也就是不用等了,只用拷贝就行
int newfd = _listen_socket->Accepter(&client);
if (newfd < 0)
return;
else
{
std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
// 把新的newfd添加到辅助数组中
int pos = -1;
for (int i = 0; i < MAX; i++)
{
if (_fds[i].fd == gdefaultfd)
{
pos = i;
break;
}
}
if (pos == -1)
{
// 可以对数组扩容
LOG(LogLevel::ERROR) << "服务器已经满了...";
close(newfd);
}
else
{
_fds[pos].fd = newfd;
_fds[pos].events |= POLLIN;
}
}
}
void Recver(int who)
{
char buffer[1024];
ssize_t n = recv(_fds[who].fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client# " << buffer << std::endl;
std::string message = "echo# ";
message += buffer;
send(_fds[who].fd, message.c_str(), message.size(), 0);
}
else if (n == 0)
{
LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fds[who].fd;
close(_fds[who].fd);
_fds[who].fd = gdefaultfd;
_fds[who].events = _fds[who].revents = 0;
}
else
{
LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fds[who].fd;
close(_fds[who].fd);
_fds[who].fd = gdefaultfd;
_fds[who].events = _fds[who].revents = 0;
}
}
void Dispatcher()
{
for (int i = 0; i < MAX; i++)
{
if (_fds[i].fd == gdefaultfd)
continue;
if (_fds[i].fd == _listen_socket->Fd())
{
// 是listensockfd,并且已经添加的集合中了
if (_fds[i].revents & POLLIN)
{
Accepter();
}
}
else
{
// 就绪的普通fd
if (_fds[i].revents & POLLIN)
{
Recver(i);
}
}
}
}
~PollServer()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listen_socket;
bool _isrunning;
struct pollfd _fds[MAX];
};
| poll 的缺点:
虽然 poll
已经解决了 select
的两个主要问题,但 poll
中监听的文件描述符数目增多时也会和 select 函数一样,函数返回后需要轮询 pollfd 来获取就绪的描述符,每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中,这种情况下效率也不是很高。同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
epoll
是 Linux 系统中一种高效的 I/O 事件通知机制,常用于处理大量文件描述符的 I/O 事件,特别适合高并发场景。它是 select
和 poll
的改进版本,几乎解决了它们的所有问题。对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用 epoll
。
int epoll_create(int size);
epoll
模型,并返回文件描述符用于后续的操作;close()
关闭。epoll
模型:
epoll
能够高效地管理大量的文件描述符;epoll_wait
函数来等待这些就绪的文件描述符,并从就绪队列中获取;epoll
的事件注册函数。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
epfd
:epoll_create()
的返回值op
:操作,用三个宏表示: EPOLL_CTL_ADD
:注册新的 fd 到 epfd 中EPOLL_CTL_MOD
:修改已经注册的 fd 的监听事件EPOLL_CTL_DEL
:从 epfd 中删除一个 fdfd
:需要监听的 fdevent
:需要监听的事件epoll_event
的结构:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events
可以是以下几个宏的集合:
宏 | 说明 |
---|---|
EPOLLIN | 文件描述符可以读 (包括对端 SOCKET 正常关闭) |
EPOLLOUT | 文件描述符可以写 |
EPOLLPRI | 文件描述符有紧急的数据可读 (这里应该表示有带外数据到来) |
EPOLLERR | 文件描述符发生错误 |
EPOLLHUP | 文件描述符被挂断 |
EPOLLET | 将 epoll 设为边缘触发(Edge Triggered)模式 |
EPOLLONESHOT | 只监听一次事件,当监听完这次事件之后如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 epoll 队列里 |
int epoll_wait(int epfd, struct epoll_event *events,int maxevents,
int timeout);
events
数组中 (events 不可以是空指针,内核
只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)maxevents
告之内核这个 events
有多大, maxevents 的值不能大于创建epoll_create()时的 sizeepoll_create
方法时,Linux 内核会创建一个 eventpoll
结构体,这个结构体中有两个成员与 epoll
的使用方式密切相关。struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
epoll
对象都有一个独立的 eventpoll
结构体,用于存放通过 epoll_ctl
方法向 epoll
对象中添加进来的事件;epoll
中的事件都会与设备(网卡)驱动程序建立回调关系,当响应的事件发生时会调用回调方法 ep_poll_callback
,它会将发生的事件添加到 rdlist
双链表中;epoll
中,对于每一个事件,都会建立一个 epitem
结构体。struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的 eventpoll 对象
struct epoll_event event; //期待发生的事件类型
}
epoll_wait
检查是否有事件发生时,只需要检查 eventpoll
对象中的
rdlist
双链表中是否有 epitem
即可;rdlist
不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是 O(1)。对比 select
和 poll
,epoll
几乎没有缺点:
epoll
默认的状态就是 LT (Level Triggered)(水平触发)工作模式;
epoll
会立即通知应用程序,但是可以不立即进行处理,或者只处理一部分;
rdlist
中)(例如缓冲区中有数据可读或可写),epoll
就会持续通知应用程序,直到应用程序处理完所有就绪事件并且再次进入阻塞等待状态;
events
设置 EPOLLET
,就可以让 epoll
进入 ET (Edge Triggered)(边缘触发)工作模式;
epoll
才会通知应用程序,并且 ET 要求必须立刻处理;
rdlist
中立马移除),只有一次处理机会,如果应用程序没有及时处理完这个事件,下次等待时将会错过该事件,即使事件仍然处于就绪状态;
epoll_wait
返回次数少了很多),并且 ET 只支持非阻塞的读写(ET 模式下要求我们每次都要把数据取完,要保证取完就要循环读取,要循环读取就必须是非阻塞的);
ET 为什么比 LT 更高效? ET 不做重复通知,一旦通知,数据必须取完(提高IO带宽),这就倒逼着我们在 ET 模式下不得不把数据取完,另外也会间接地给对方返回一个更大的接收窗口,提高对方滑动窗口的大小(Tcp),提高IO吞吐量。
epoll
使用示例:
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"
#define MAX 4096
using namespace SocketModule;
using namespace LogModule;
const int gdefaultfd = -1;
class EpollServer
{
static const int revs_num = 64;
public:
EpollServer(uint16_t port)
: _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false), _epfd(gdefaultfd)
{
}
void Init()
{
_listen_socket->BuildTcpSocketMethod(_port);
// 1.创建epoll模型
_epfd = epoll_create(256);
if (_epfd < 0)
{
LOG(LogLevel::ERROR) << "epoll create error";
Die(EPOLL_CREATE_ERROR);
}
LOG(LogLevel::DEBUG) << "epoll create success: " << _epfd;
// 2.将listensockfd添加到epoll模型中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listen_socket->Fd();
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listen_socket->Fd(), &ev);
if (n < 0)
{
LOG(LogLevel::ERROR) << "epoll_ctl error";
Die(EPOLL_CTL_ERROR);
}
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
int timeout = -1;
int n = epoll_wait(_epfd, _revs, revs_num, timeout);
switch (n)
{
case 0:
std::cout << "time out..." << std::endl;
break;
case -1:
perror("epoll_wait");
break;
default:
std::cout << "有事件就绪了..." << std::endl;
Dispatcher(n); // 把已经就绪的fd,派发给指定的模块
break;
}
}
_isrunning = false;
}
void Accepter()
{
InetAddr client;
// 不会阻塞了,也就是不用等了,只用拷贝就行
int newfd = _listen_socket->Accepter(&client);
if (newfd < 0)
return;
else
{
std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
struct epoll_event ev;
ev.data.fd = newfd;
ev.events = EPOLLIN;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, newfd, &ev);
if (n < 0)
{
LOG(LogLevel::ERROR) << "epoll_ctl error";
close(newfd);
}
}
}
void Recver(int fd)
{
char buffer[1024];
ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client# " << buffer << std::endl;
std::string message = "echo# ";
message += buffer;
send(fd, message.c_str(), message.size(), 0);
}
else if (n == 0)
{
LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << fd;
// close(fd);
// 把fd从epoll中移除,必须保证fd是合法的
// 应该先把fd从epoll中移除,在关闭fd
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
if (n < 0)
{
LOG(LogLevel::ERROR) << "epoll_ctl error";
Die(EPOLL_CTL_ERROR);
}
LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
close(fd);
}
else
{
LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << fd;
// close(fd);
// 把fd从epoll中移除
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
if (n < 0)
{
LOG(LogLevel::ERROR) << "epoll_ctl error";
Die(EPOLL_CTL_ERROR);
}
LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
close(fd);
}
}
void Dispatcher(int rnum)
{
for (int i = 0; i < rnum; i++)
{
if (_revs[i].data.fd == _listen_socket->Fd())
{
if (_revs[i].events == EPOLLIN)
{
Accepter();
}
}
else
{
if (_revs[i].events == EPOLLIN)
{
Recver(_revs[i].data.fd);
}
}
}
}
~EpollServer()
{
close(_listen_socket->Fd());
if (_epfd) close(_epfd);
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listen_socket;
bool _isrunning;
int _epfd;
struct epoll_event _revs[revs_num];
};
本篇文章的分享就到这里了,如果您觉得在本文有所收获,还请留下您的三连支持哦~