epoll 的实现比poll/select 复杂一些,这是因为: 1. epoll_wait, epoll_ctl 的调用完全独立开来,内核需要锁机制对这些操作进行保护,并且需要持久的维护添加到epoll的文件 2. epoll本身也是文件,也可以被poll/select/epoll监视,这可能导致epoll之间循环唤醒的问题 3. 单个文件的状态改变可能唤醒过多监听在其上的epoll,产生唤醒风暴
epoll各个功能的实现要非常小心面对这些问题,使得复杂度大大增加。
// epoll的核心实现对应于一个epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在这里
// f_op->poll() 使用的, 被其他事件通知机制利用的wait_address
wait_queue_head_t poll_wait;
/* 已就绪的需要检查的epitem 列表 */
struct list_head rdllist;
/* 保存所有加入到当前epoll的文件对应的epitem*/
struct rb_root rbr;
// 当正在向用户空间复制数据时, 产生的可用文件
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/*优化循环检查,避免循环检查中重复的遍历 */
int visited;
struct list_head visited_list_link;
}
// 对应于一个加入到epoll的文件
struct epitem {
// 挂载到eventpoll 的红黑树节点
struct rb_node rbn;
// 挂载到eventpoll.rdllist 的节点
struct list_head rdllink;
// 连接到ovflist 的指针
struct epitem *next;
/* 文件描述符信息fd + file, 红黑树的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 当前文件的等待队列(eppoll_entry)列表
// 同一个文件上可能会监视多种事件,
// 这些事件可能属于不同的wait_queue中
// (取决于对应文件类型的实现),
// 所以需要使用链表
struct list_head pwqlist;
// 当前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 传入的用户数据 */
struct epoll_event event;
};
struct epoll_filefd {
struct file *file;
int fd;
};
// 与一个文件上的一个wait_queue_head 相关联,因为同一文件可能有多个等待的事件,这些事件可能使用不同的等待队列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的节点
wait_queue_t wait;
// 文件wait_queue 头
wait_queue_head_t *whead;
};
// 用户使用的epoll_event
struct epoll_event {
__u32 events;
__u64 data;
} EPOLL_PACKED;
// epoll 文件系统的相关实现
// epoll 文件系统初始化, 在系统启动时会调用
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符数量
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化递归检查队列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分别用来分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0) {
return -EINVAL;
}
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC) {
return -EINVAL;
}
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0) {
return error;
}
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
// 设置epfd的相关操作,由于epoll也是文件也提供了poll操作
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
fd_install(fd, file);
ep->file = file;
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
递归深度检测(ep_call_nested)
epoll本身也是文件,也可以被poll/select/epoll监视,如果epoll之间互相监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。以下就是这一过程的源码。
struct nested_call_node {
struct list_head llink;
void *cookie; // 函数运行标识, 任务标志
void *ctx; // 运行环境标识
};
struct nested_calls {
struct list_head tasks_call_list;
spinlock_t lock;
};
// 全局的不同调用使用的链表
// 死循环检查和唤醒风暴检查链表
static nested_call_node poll_loop_ncalls;
// 唤醒时使用的检查链表
static nested_call_node poll_safewake_ncalls;
// 扫描readylist 时使用的链表
static nested_call_node poll_readywalk_ncalls;
// 限制epoll 中直接或间接递归调用的深度并防止死循环
// ctx: 任务运行上下文(进程, CPU 等)
// cookie: 每个任务的标识
// priv: 任务运行需要的私有数据
// 如果用面向对象语言实现应该就会是一个wapper类
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
int (*nproc)(void *, void *, int), void *priv,
void *cookie, void *ctx)
{
int error, call_nests = 0;
unsigned long flags;
struct list_head *lsthead = &ncalls->tasks_call_list;
struct nested_call_node *tncur;
struct nested_call_node tnode;
spin_lock_irqsave(&ncalls->lock, flags);
// 检查原有的嵌套调用链表ncalls, 查看是否有深度超过限制的情况
list_for_each_entry(tncur, lsthead, llink) {
// 同一上下文中(ctx)有相同的任务(cookie)说明产生了死循环
// 同一上下文的递归深度call_nests 超过限制
if (tncur->ctx == ctx &&
(tncur->cookie == cookie || ++call_nests > max_nests)) {
error = -1;
}
goto out_unlock;
}
/* 将当前的任务请求添加到调用列表*/
tnode.ctx = ctx;
tnode.cookie = cookie;
list_add(&tnode.llink, lsthead);
spin_unlock_irqrestore(&ncalls->lock, flags);
/* nproc 可能会导致递归调用(直接或间接)ep_call_nested
* 如果发生递归调用, 那么在此函数返回之前,
* ncalls 又会被加入额外的节点,
* 这样通过前面的检测就可以知道递归调用的深度
*/
error = (*nproc)(priv, cookie, call_nests);
/* 从链表中删除当前任务*/
spin_lock_irqsave(&ncalls->lock, flags);
list_del(&tnode.llink);
out_unlock:
spin_unlock_irqrestore(&ncalls->lock, flags);
return error;
}
循环检测(ep_loop_check)
循环检查(ep_loop_check),该函数递归调用ep_loop_check_proc利用ep_call_nested来实现epoll之间相互监视的死循环。因为ep_call_nested中已经对死循环和过深的递归做了检查,实际的ep_loop_check_proc的实现只是递归调用自己。其中的visited_list和visited标记完全是为了优化处理速度,如果没有visited_list和visited标记函数也是能够工作的。该函数中得上下文就是当前的进程,cookie就是正在遍历的epoll结构。
static LIST_HEAD(visited_list);
// 检查 file (epoll)和ep 之间是否有循环
static int ep_loop_check(struct eventpoll *ep, struct file *file)
{
int ret;
struct eventpoll *ep_cur, *ep_next;
ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, file, ep, current);
/* 清除链表和标志 */
list_for_each_entry_safe(ep_cur, ep_next, &visited_list,
visited_list_link) {
ep_cur->visited = 0;
list_del(&ep_cur->visited_list_link);
}
return ret;
}
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct eventpoll *ep = file->private_data;
struct eventpoll *ep_tovisit;
struct rb_node *rbp;
struct epitem *epi;
mutex_lock_nested(&ep->mtx, call_nests + 1);
// 标记当前为已遍历
ep->visited = 1;
list_add(&ep->visited_list_link, &visited_list);
// 遍历所有ep 监视的文件
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
if (unlikely(is_file_epoll(epi->ffd.file))) {
ep_tovisit = epi->ffd.file->private_data;
// 跳过先前已遍历的, 避免循环检查
if (ep_tovisit->visited) {
continue;
}
// 所有ep监视的未遍历的epoll
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, epi->ffd.file,
ep_tovisit, current);
if (error != 0) {
break;
}
} else {
// 文件不在tfile_check_list 中, 添加
// 最外层的epoll 需要检查子epoll监视的文件
if (list_empty(&epi->ffd.file->f_tfile_llink))
list_add(&epi->ffd.file->f_tfile_llink,
&tfile_check_list);
}
}
mutex_unlock(&ep->mtx);
return error;
}
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。