epoll在现在的软件中占据了很大的分量,nginx,libuv等单线程事件循环的软件都使用了epoll。之前分析过select,今天分析一下epoll。
们按照epoll三部曲的顺序进行分析。
asmlinkage long sys_epoll_create(int size)
{
int error, fd;
struct inode *inode;
struct file *file;
error = ep_getfd(&fd, &inode, &file);
error = ep_file_init(file);
return fd;
}
我们发现create函数似乎很简单。 1 操作系统中,进程和文件系统是通过fd=>file=>node联系起来的。ep_getfd就是在建立这个联系。
static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{
// 获取一个file结构体
file = get_empty_filp();
// epoll在底层本身对应一个文件系统,从这个文件系统中获取一个inode
inode = ep_eventpoll_inode();
// 获取一个文件描述符
fd = get_unused_fd();
sprintf(name, "[%lu]", inode->i_ino);
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino;
// 申请一个entry
dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);
dentry->d_op = &eventpollfs_dentry_operations;
file->f_dentry = dentry;
// 建立file和inode的联系
d_add(dentry, inode);
// 建立fd=>file的关联
fd_install(fd, file);
*efd = fd;
*einode = inode;
*efile = file;
return 0;
}
形成一个这种的结构。
在这里插入图片描述 2 通过ep_file_init建立file和epoll的关联。
static int ep_file_init(struct file *file)
{
struct eventpoll *ep;
ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)
memset(ep, 0, sizeof(*ep));
// 一系列初始化
file->private_data = ep;
return 0;
}
在这里插入图片描述 epoll_create函数主要是建立一个数据结构。并返回一个文件描述符供后面使用。
asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
// 不是删除操作则复制用户数据到内核
if (
EP_OP_HASH_EVENT(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event))
)
goto eexit_1;
// 根据一种的图,拿到epoll对应的file结构体
file = fget(epfd);
// 拿到操作的文件的file结构体
tfile = fget(fd);
// 通过file拿到epoll_event结构体,见上面的图
ep = file->private_data;
// 看这个文件描述符是否已经存在,epoll用红黑树维护这个数据
epi = ep_find(ep, tfile, fd);
switch (op) {
// 新增
case EPOLL_CTL_ADD:
// 还没有则新增,有则报错
if (!epi) {
epds.events |= POLLERR | POLLHUP;
// 插入红黑树
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
// 删除
case EPOLL_CTL_DEL:
// 存在则删除,否则报错
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
// 修改
case EPOLL_CTL_MOD:
// 存在则修改,否则报错
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
}
epoll_ctl函数看起来也没有很复杂,就是根据用户传进来的信息去操作红黑树。对于红黑树的增删改查,查和删除就不分析了。就是去操作红黑树。增和改是类似的逻辑,所以我们只分析增操作就可以了。在此之前,我们先了解一些epoll中其他的数据结构。
在这里插入图片描述 当我们新增一个需要监听的文件描述符的时候,系统会申请一个epitem去表示。epitem是保存了文件描述符、事件等信息的结构体。然后把epitem插入到eventpoll结构体维护的红黑树中。
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;
// 申请一个epitem
epi = EPI_MEM_ALLOC()
// 省略一系列初始化工作
// 记录所属的epoll
epi->ep = ep;
// 在epitem中保存文件描述符fd和file
EP_SET_FFD(&epi->ffd, tfile, fd);
// 监听的事件
epi->event = *event;
epi->nwait = 0;
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
// 把epitem插入红黑树
ep_rbtree_insert(ep, epi);
// 如果监听的事件在新增的时候就已经触发,则直接插入到epoll就绪队列
if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {
// 把epitem插入就绪队列rdllist
list_add_tail(&epi->rdllink, &ep->rdllist);
// 有事件触发,唤醒阻塞在epoll_wait的进程队列
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
}
新增操作的大致流程是 1 申请了一个新的epitem表示待观察的实体。他保存了文件描述符、感兴趣的事件等信息。 2 插入红黑树 3 判断新增的节点中对应的文件描述符和事件是否已经触发了,是则加入到就绪队列(由eventpoll->rdllist维护的一个队列) 下面具体看一下如何判断感兴趣的事件在对应的文件描述符中是否已经触发。相关代码在ep_insert中。下面单独拎出来。
/*
struct ep_pqueue {
// 函数指针
poll_table pt;
// epitem
struct epitem *epi;
};
*/
struct ep_pqueue epq;
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}
上面的代码是定义了一个struct ep_pqueue 结构体,然后设置他的一个字段为ep_ptable_queue_proc。然后执行tfile->f_op->poll。poll函数由各个文件系统或者网络协议实现。我们以管道为例。
static unsigned int
pipe_poll(struct file *filp, poll_table *wait)
{
unsigned int mask;
// 监听的文件描述符对应的inode
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info = inode->i_pipe;
int nrbufs;
/*
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
*/
poll_wait(filp, PIPE_WAIT(*inode), wait);
// 判断哪些事件触发了
nrbufs = info->nrbufs;
mask = 0;
if (filp->f_mode & FMODE_READ) {
mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0;
if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
mask |= POLLHUP;
}
if (filp->f_mode & FMODE_WRITE) {
mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0;
if (!PIPE_READERS(*inode))
mask |= POLLERR;
}
return mask;
}
我们看到具体的poll函数里会首先执行poll_wait函数。这个函数只是简单执行struct ep_pqueue epq结构体中的函数,即刚才设置的ep_ptable_queue_proc。
// 监听的文件描述符对应的file结构体,whead是等待监听的文件描述符对应的inode可用的队列
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) {
pwq->wait->flags = 0;
pwq->wait->task = NULL;
// 设置回调
pwq->wait->func = ep_poll_callback;
pwq->whead = whead;
pwq->base = epi;
// 插入等待监听的文件描述符的inode可用的队列,回调函数是ep_poll_callback
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
主要的逻辑是把当前进程插入监听的文件的等待队列中,等待唤醒。
asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
// 通过epoll的fd拿到对应的file结构体
file = fget(epfd);
// 通过file结构体拿到eventpoll结构体
ep = file->private_data;
error = ep_poll(ep, events, maxevents, timeout);
return error;
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
// 计算超时时间
jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;
retry:
res = 0;
// 就绪队列为空
if (list_empty(&ep->rdllist)) {
// 加入阻塞队列
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait);
for (;;) {
// 挂起
set_current_state(TASK_INTERRUPTIBLE);
// 超时或者有就绪事件了,则跳出返回
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
// 被信号唤醒返回EINTR
if (signal_pending(current)) {
res = -EINTR;
break;
}
// 设置定时器,然后进程挂起,等待超时唤醒(超时或者信号唤醒)
jtimeout = schedule_timeout(jtimeout);
}
// 移出阻塞队列
remove_wait_queue(&ep->wq, &wait);
// 设置就绪
set_current_state(TASK_RUNNING);
}
// 是否有事件就绪,唤醒的原因有几个,被唤醒不代表就有就绪事件
eavail = !list_empty(&ep->rdllist);
write_unlock_irqrestore(&ep->lock, flags);
// 处理就绪事件返回
if (!res && eavail &&
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
总的来说epoll_wait的逻辑主要是处理就绪队列的节点。 1 如果就绪队列为空,则根据timeout做下一步处理,可能定时阻塞。 2 如果就绪队列非空则处理就绪队列,返回给用户。处理就绪队列的函数是ep_events_transfer。
static int ep_events_transfer(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
int eventcnt = 0;
struct list_head txlist;
INIT_LIST_HEAD(&txlist);
if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
eventcnt = ep_send_events(ep, &txlist, events);
ep_reinject_items(ep, &txlist);
}
return eventcnt;
}
主要是三个函数,我们一个个看。 1 ep_collect_ready_items收集就绪事件
static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist, int maxevents)
{
int nepi;
unsigned long flags;
// 就绪事件的队列
struct list_head *lsthead = &ep->rdllist, *lnk;
struct epitem *epi;
for (nepi = 0, lnk = lsthead->next; lnk != lsthead && nepi < maxevents;) {
// 通过结构体字段的地址拿到结构体首地址
epi = list_entry(lnk, struct epitem, rdllink);
lnk = lnk->next;
/* If this file is already in the ready list we exit soon */
if (!EP_IS_LINKED(&epi->txlink)) {
epi->revents = epi->event.events;
// 插入txlist队列,然后处理完再返回给用户
list_add(&epi->txlink, txlist);
nepi++;
// 从就绪队列中删除
EP_LIST_DEL(&epi->rdllink);
}
}
return nepi;
}
2 ep_send_events判断哪些事件触发了
static int ep_send_events(struct eventpoll *ep, struct list_head *txlist,
struct epoll_event __user *events)
{
int eventcnt = 0;
unsigned int revents;
struct list_head *lnk;
struct epitem *epi;
// 遍历就绪队列,记录触发的事件
list_for_each(lnk, txlist) {
epi = list_entry(lnk, struct epitem, txlink);
// 判断哪些事件触发了
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
epi->revents = revents & epi->event.events;
// 复制到用户空间
if (epi->revents) {
if (__put_user(epi->revents,
&events[eventcnt].events) ||
__put_user(epi->event.data,
&events[eventcnt].data))
return -EFAULT;
// 只监听一次,触发完设置成对任何事件都不感兴趣
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
eventcnt++;
}
}
return eventcnt;
}
3 ep_reinject_items重新插入就绪队列
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
{
int ricnt = 0, pwake = 0;
unsigned long flags;
struct epitem *epi;
while (!list_empty(txlist)) {
epi = list_entry(txlist->next, struct epitem, txlink);
EP_LIST_DEL(&epi->txlink);
// 水平触发模式则一直通知,即重新加入就绪队列
if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ricnt++;
}
}
}
我们发现,并有没有在epoll_wait的时候去收集就绪事件,那么就绪队列是谁处理的呢?我们回顾一下插入红黑树的时候,做了一个事情,就是在文件对应的inode上注册一个回调。当文件满足条件的时候,就会唤醒因为epoll_wait而阻塞的进程。epoll_wait会收集事件返回给用户。
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = EP_ITEM_FROM_WAIT(wait);
struct eventpoll *ep = epi->ep;
// 插入就绪队列
list_add_tail(&epi->rdllink, &ep->rdllist);
// 唤醒因epoll_wait而阻塞的进程
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
return 1;
}
在这里插入图片描述 epoll的实现涉及的内容比较多,先分析一下大致的原理。有机会再深入分析。