EFD_CLOEXEC
: 在调用 exec
系列函数时自动关闭文件描述符。EFD_NONBLOCK
: 使读写操作非阻塞。uint64_t val = 1;
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
eventfd
写入 1
三次,计数器的值累加为 3
。uint64_t res = 0;
read(efd, &res, sizeof(res));
printf("res = %lu\n", res);
从 eventfd
读取计数器的值,res
被设置为 3
,同时计数器清零。
输出结果为:
res = 3
close(efd);
eventfd
文件描述符,释放资源。常见用途
- 使用 `eventfd` 实现生产者-消费者模型或信号量机制。
- 在多线程或多进程环境中,用于通知某些事件的发生。
- 将 `eventfd` 文件描述符加入 `epoll`,用于事件驱动的程序中。
注意事项
- 如果计数器的值超过 `UINT64_MAX`,`write` 会返回错误 `EOVERFLOW`
- 如果设置了 `EFD_NONBLOCK`,在计数器为 0 时调用 `read` 会返回 `-1` 并设置 `errno` 为 `EAGAIN`
- 如果设置了 `EFD_SEMAPHORE`,每次读取会返回 `1`,并将计数器减 1,而不是清零
Eventloop
:进行事件监控,以及事件处理的模块(关键点:这个模块和线程是一一对应的)
如何保证一个连接的所有操作都在 eventloop
对应的线程中
eventloop
模块中,添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中eventloop
处理流程:
事件监控
注意:由于有可能因为等待描述符 IO 事件就绪,导致执行流流程阻塞,这时候任务队列中的任务得不到指向
代码实现
class EventLoop{
public:
void RunAllTask(){
// 在加锁期间取出所有任务, 给锁限定作用域
std::vector<Functor> tasks;
{
std::lock_guard<std::mutex> lock(_mutex); // 加锁
tasks.swap(_tasks); // 交换任务池, 取出所有任务
}
for(auto &t: tasks){
t(); // 执行任务
}
return ;
}
static int CreateEventfd(){
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(efd < 0){
LOG_ERROR("CREATE EVENTFD ERROR");
abort(); // 退出程序
}
return efd;
}
void ReadEventFd(){
uint64_t data = 0;
ssize_t ret = read(_event_fd, &data, sizeof(data));
if(ret < 0){
if(errno == EAGAIN || errno == EINTR){
return; // 没有数据可读
}
LOG_ERROR("READ EVENTFD ERROR");
abort(); // 退出程序
}
return ;
}
// 唤醒事件循环
void WakeupEventFd(){
uint64_t data = 1;
ssize_t ret = write(_event_fd, &data, sizeof(data));
if(ret < 0){
if(errno == EAGAIN || errno == EINTR){
return; // 没有数据可读
}
LOG_ERROR("WRITE EVENTFD ERROR");
abort(); // 退出程序
}
return ;
}
public:
using Functor = std::function<void()>;
EventLoop()
: _thread_id(std::this_thread::get_id()), // 获取当前线程 ID
_event_fd(CreateEventfd()), // 创建 eventfd 唤醒 IO 事件监控
_event_channel(new Channel(this, _event_fd)) // 创建事件循环的 Channel
{
//给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
_event_channel->EnableRead(); // 设置可读事件
}
// 判断当前线程是否是 EventLoop 中对应线程
bool IsInLoop() {return _thread_id == std::this_thread::get_id();}
// 修改/添加 描述符的事件监控
void UpdateEvent(Channel* channel){
assert(IsInLoop()); // 断言: 当前线程是事件循环线程
_poller.UpdateEvent(channel); // 修改/添加事件监控
}
void RemoveEvent(Channel* channel){
assert(IsInLoop());
_poller.RemoveEvent(channel); // 移除事件监控
}
// 事件监控->就绪事件处理->执行任务
void Start(){
// 1, 事件监控
std::vector<Channel*> actives; // 活跃连接
_poller.Poll(&actives); // 进行事件监控
// 2, 事件处理
for(auto &channel: actives){
channel->HandleEvent(); // 处理事件
}
// 3, 执行任务
RunAllTask(); // 执行任务
}
// 压入任务队列
void QueueInLoop(const Functor& cb){
{
std::lock_guard<std::mutex> lock(_mutex); // 加锁
_tasks.emplace_back(cb); // 压入任务
}
// 唤醒事件循环 -- 由于没有事件就绪 导致的 epoll 阻塞
// 其实就是给 eventfd 写入一个数据, 使得 epoll 事件就绪
WakeupEventFd();
}
// 判断要执行任务是否处于当前线程, 如果是则执行, 不是则压入队列
void RunInLoop(const Functor& cb){
if(IsInLoop()){
cb();
}else{
QueueInLoop(cb);
}
}
private:
std::thread::id _thread_id; // 事件循环线程 ID
int _event_fd; // eventfd 唤醒 IO 事件监控可能导致的阻塞
// 注意: 这里的 Channel用智能指针进行管理, Poller 使用的对象
std::unique_ptr<Channel> _event_channel; // 事件循环的 Channel
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 互斥锁
};
// 注意: 这里的 Channel 类也要做一些改变, 类似于 Poller 模块的处理改变
void Channel::Remove(){return _loop->RemoveEvent(this); } // 移除事件
void Channel::Update(){return _loop->UpdateEvent(this);} // 更新事件
由于我们需要用到我们之前所说的 TimeWheel 模块,并且对其做一些改变
EventLoop
线程中执行,避免线程安全问题timerfd
触发定时任务,与 epoll
事件监控无缝结合① TimerWheel 与 EventLoop 的绑定
TimerWheel
构造函数 :
TimerWheel(EventLoop *loop)
: _capacity(60), _tick(0), _loop(loop),
_timerfd(CreateTimerfd()),
_timer_channel(new Channel(_loop, _timerfd)) {
_wheel.resize(_capacity);
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead(); // 启动读事件监控
}
TimerWheel
依赖于一个 EventLoop
实例,所有定时任务的执行都通过该 EventLoop
的线程完成timerfd
的可读事件触发定时任务处理(OnTime
)② 定时器任务的执行流程
OnTime
函数
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
auto& tasks = _wheel[_tick];
for (auto& task : tasks) {
if (!task->_canceled) {
task->_cb(); // 执行回调
}
task->_release(); // 释放资源
}
tasks.clear(); // 清空当前 tick 的任务
}
timerfd
被触发时,OnTime
会被调用,读取超时次数并依次处理每个 tick 的任务。OnTime
是 Channel
的读事件回调,由 EventLoop
的线程调用,确保所有定时任务在事件循环线程中执行。RunTimerTask
函数 :
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
auto& tasks = _wheel[_tick];
for (auto& task : tasks) {
if (!task->_canceled) {
task->_cb(); // 执行回调
}
task->_release(); // 释放资源
}
tasks.clear(); // 清空当前 tick 的任务
}
_release
删除 TimerWheel
中保存的任务映射,避免内存泄漏。③ 定时器任务的添加与刷新
添加任务 :
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt); // 将任务添加到轮子中
_timers[id] = WeakTask(pt); // 保存任务映射
}
shared_ptr
管理任务生命周期,weak_ptr
避免循环引用。tick
和延迟时间 delay
计算任务在轮子中的位置。刷新任务 :
void TimerRefreshInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) return;
PtrTask pt = it->second.lock();
if (!pt) return;
int remaining = pt->DelayTime();
int pos = (_tick + remaining) % _capacity;
_wheel[pos].push_back(pt); // 重新插入任务
}
重新插入 :将任务移动到新的 tick 位置,实现延迟效果。
④ 线程安全保证
任务队列机制 :
QueueInLoop
方法 :所有对 Channel
或定时器的操作都通过 EventLoop::QueueInLoop
提交到任务队列。RunInLoop
方法 :确保操作在事件循环线程中执行。WakeupEventFd
:通过写入 eventfd
唤醒阻塞的 epoll_wait
,及时处理任务队列。定时器回调的线程一致性 :
TimerTask
析构函数 :
~TimerTask() {
if (!_canceled) _cb(); // 直接执行回调
_release();
}
关键点 :_cb()
的执行必须在 EventLoop
线程中,通过 TimerWheel::OnTime
触发,无需额外线程同步
class EventLoop {
public:
// 添加定时器任务
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc &cb) {
return _timer_wheel.AddTimer(id, delay, cb);
}
// 刷新定时器
void RefreshTimer(uint64_t id) {
return _timer_wheel.RefreshTimer(id);
}
// 取消定时器
void CancelTimer(uint64_t id) {
return _timer_wheel.CancelTimer(id);
}
private:
TimerWheel _timer_wheel; // 定时器轮
};
TimerWheel 类关键函数
class TimerWheel {
public:
// 添加定时任务(供 EventLoop 调用)
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
// 刷新定时任务
void RefreshTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
// 取消定时任务
void CancelTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerCanceInLoop, this, id));
}
private:
// 实际添加任务的逻辑
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
// 实际刷新任务的逻辑
void TimerRefreshInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) return;
PtrTask pt = it->second.lock();
if (!pt) return;
int remaining = pt->DelayTime();
int pos = (_tick + remaining) % _capacity;
_wheel[pos].push_back(pt);
}
// 实际取消任务的逻辑
void TimerCanceInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) return;
PtrTask pt = it->second.lock();
if (pt) pt->Cancel();
}
// 移除任务
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if (it != _timers.end()) _timers.erase(it);
}
// 执行定时任务
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
auto& tasks = _wheel[_tick];
for (auto& task : tasks) {
if (!task->_canceled) task->_cb(); // 执行回调
task->_release(); // 释放资源
}
tasks.clear(); // 清空当前 tick 的任务
}
// 定时器事件回调
void OnTime() {
int times = ReadTimerfd();
for (int i = 0; i < times; ++i) {
RunTimerTask(); // 执行定时任务
}
}
// 创建 timerfd
static int CreateTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0) {
LOG_ERROR("Create timerfd error");
abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
timerfd_settime(timerfd, 0, &itime, nullptr);
return timerfd;
}
// 读取 timerfd
int ReadTimerfd() {
uint64_t times = 0;
ssize_t ret = read(_timerfd, ×, sizeof(times));
if (ret < 0) {
LOG_ERROR("READ TIMERFD ERROR");
abort();
}
return times;
}
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
const int _capacity;
int _tick;
int _timerfd;
std::unique_ptr<Channel> _timer_channel;
EventLoop* _loop;
std::vector<std::vector<PtrTask>> _wheel; // 定时器轮
std::unordered_map<uint64_t, WeakTask> _timers; // ID 映射
};
server.cpp
#include "../../source/server.hpp"
void HandleClose(Channel *channel){
LOG_DEBUG("close fd: %d", channel->Fd());
channel->Remove(); // 移除事件
delete channel; // 释放内存
}
void HandleRead(Channel *channel){
int fd = channel->Fd();
char buf[1024] = {0};
ssize_t ret = recv(fd, buf, 1023, 0);
if(ret <= 0){
return HandleClose(channel); // 关闭事件
}
LOG_DEBUG("Read: %s", buf);
channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel *channel){
int fd = channel->Fd();
const char *data = "I miss You";
ssize_t ret = send(fd, data, strlen(data), 0);
if(ret < 0){
return HandleClose(channel); // 关闭事件
}
channel->DisableWrite(); // 关闭可写事件
}
void HandleError(Channel *channel){
return HandleClose(channel);
}
void HandlEvent(EventLoop* loop, Channel *channel, uint64_t timerid){
loop->RefreshTimer(timerid); // 刷新定时器
}
void Acceptor(EventLoop* loop, Channel *lst_channel)
{
int fd = lst_channel->Fd();
int newfd = accept(fd, nullptr, nullptr);
if(newfd < 0) return;
uint64_t timerid = rand() % 1000;
Channel *channel = new Channel(loop, newfd);
channel->SetReadCallback(std::bind(HandleRead, channel)); // 为通信套接字设置可读事件回调函数
channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件的回调函数
channel->SetCloseCallback(std::bind(HandleClose, channel)); // 关闭事件的回调函数
channel->SetErrorCallback(std::bind(HandleError, channel)); // 错误事件的回调函数
channel->SetEventCallback(std::bind(HandlEvent, loop, channel, timerid)); // 任意事件的回调函数
// 非活跃连接的超时释放操作 -- 5s 后关闭
// 注意: 定时销毁任务必须在启动读事件之前, 因为读事件会启动可写事件, 但这个时候还没有任务
loop->AddTimer(timerid, 5, std::bind(HandleClose, channel));
channel->EnableRead(); // 监听读事件
}
int main()
{
srand(time(nullptr)); // 随机数种子
EventLoop loop;
Socket lst_sock;
lst_sock.CreateServer(8080);
// 为监听套接字, 创建一个 Channel 进行事件的管理及处理
Channel channel(&loop, lst_sock.Fd());
channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 设置监听套接字的可读事件回调函数
channel.EnableRead();
while(1){
loop.Start(); // 事件循环
}
lst_sock.Close();
return 0;
}
client.cpp
#include "../../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
for(int i = 0; i < 3; ++i){
std::string str = "Hello IsLand";
cli_sock.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_sock.Recv(buf, 1023);
LOG_DEBUG("%s", buf);
sleep(1);
}
while(1) sleep(1);
return 0;
}
结果如下:
lighthouse@VM-8-10-ubuntu:Test4$ ./client
2025-05-05 22:53:49 [tcp_cli.cc:13] I miss You
2025-05-05 22:53:50 [tcp_cli.cc:13] I miss You
2025-05-05 22:53:51 [tcp_cli.cc:13] I miss You
^C
lighthouse@VM-8-10-ubuntu:Test4$ ./client
2025-05-05 22:54:00 [tcp_cli.cc:13] I miss You
2025-05-05 22:54:01 [tcp_cli.cc:13] I miss You
2025-05-05 22:54:02 [tcp_cli.cc:13] I miss You
^C
lighthouse@VM-8-10-ubuntu:Test4$ ./server
2025-05-05 22:53:49 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:53:50 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:53:51 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:53:55 [tcp_srv.cc:4] close fd: 7
2025-05-05 22:54:00 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:54:01 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:54:02 [tcp_srv.cc:16] Read: Hello IsLand
2025-05-05 22:54:04 [tcp_srv.cc:4] close fd: 7
^C
由于我之前在实现 TimeWheel 代码是这样写的,如下:
~TimerTask() {
if (!_canceled) {
std::thread(_cb).detach(); // ❌ 异步执行回调
}
_release();
}
这个写法会导致定时器回调(如 HandleClose
)在子线程中执行 ,而不是在 EventLoop
所属的线程中执行,然后就出现了如下的问题:
Assertion `IsInLoop()' failed.
Aborted (core dumped)
根本原因 是:在非事件循环线程中调用了 RemoveEvent
或 Channel::Remove()
,而 EventLoop
的所有操作都要求必须在事件循环线程中执行(通过 assert(IsInLoop())
检查)
分析
channel->Remove(); // 会调用 EventLoop::RemoveEvent
RemoveEvent
中有断言:assert(IsInLoop()); // 检查当前线程是否是事件循环线程
上面我们演示的时候,可以发现,当我们服务器端关闭再启动之后 fd 并没有发生改变,如下:
2025-05-05 22:53:55 [tcp_srv.cc:4] close fd: 7
2025-05-05 22:54:04 [tcp_srv.cc:4] close fd: 7
Linux 系统中,文件描述符的分配遵循 “最小可用原则” ,即:
分析
client
,都会创建一个新的 socket,系统返回一个可用的 fd。7
)。在 Acceptor
函数中,每当有新连接到来时:
int newfd = accept(fd, nullptr, nullptr);
Channel *channel = new Channel(loop, newfd);
newfd
是系统分配的文件描述符
如果前一个连接的 newfd
刚好是 7
,并且已经被关闭(close(7)
),那么下一个新连接就会再次分配 7
你定义的 Channel
类析构函数会 主动关闭 fd :
~Channel(){
if (_fd != -1) {
close(_fd);
_fd = -1;
}
}
这意味着每次连接关闭时,Channel
对象被销毁时会调用 close(fd)
,从而释放该 fd
释放后,系统可以再次分配该 fd 给新的连接
补充(关于这种做法的意义)
close(fd)
,系统会安全地回收和复用 fdChannel
对象,防止 fd 被占用不释放Channel
类中的 Remove
和 Update
方法为何调用 EventLoop
的接口?问题本质:模块职责分离
回答要点:
职责分离 :Channel
仅负责事件注册,Poller
负责底层 I/O 事件监控。
统一管理 :通过 EventLoop
统一管理事件增删改,确保事件状态一致性。
示例代码 :
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
问题本质:资源泄漏与逻辑错误
回答要点:
HasTimer
检查 :在添加定时任务前调用 HasTimer(id)
避免重复。RefreshTimer(id)
延迟销毁时间。EventLoop
串行化执行。