上一篇文章中,我们介绍了编程思想中的 Reactor 与 Proactor 两种设计模式: 程序设计中的两大经典模式 — Reactor & Proactor
由于 windows 平台下,Proactor 思想的 IOCP 拥有强大的性能,因此通过 IOCP 实现的 apache 在 windows 环境下有着卓越的性能。 但在 linux 环境下,Proactor 的实现 aio 就显得不那么完善且难以使用了,因此,linux 环境中 Reactor 思想的使用更为常见,例如 nginx、redis 都是 Reactor 实现事件驱动的优秀例子。 此前我们已经介绍过 nginx 事件驱动,本文,我们就来详细介绍一下 redis 是的事件驱动是如何工作的。
下图展示了 redis 的事件驱动模型:
如图所示,redis 是一个典型的 Reactor 模式的通信系统。 他通过 IO 复用模块封装系统多路复用 IO,实现了对多个 fd 状态的监听。 当被监听的多个 fd 中,有若干个进入到就绪状态,redis 的事件分发器就会根据具体事件的类型调用对应的事件处理器来进行处理。
redis 源码中有以下几个 ae 作为前缀的文件:
他们就是对 Reactor 模式的封装,本文,我们重点关注基础的 ae.h、ae.c、ae_epoll.c 中基本的事件结构的定义和对 epoll 的封装。
ae.h 中声明了基础的事件存储结构以及事件的处理函数。
redis 中,事件分为下面三种类型:
下面就是存储上述三个事件的结构声明:
/* File event structure */
typedef struct aeFileEvent {
// 读或写标识
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
// 读事件处理函数
aeFileProc *rfileProc;
// 写事件处理函数
aeFileProc *wfileProc;
// 传递给上述两个函数的参数数据
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
// 事件 ID,唯一标识一个时间事件,用于查询和删除
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数
aeTimeProc *timeProc;
// 事件被删除前进行调用的处理函数
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
可以看到,时间事件结构中,具有前驱、后继两个指针,构成了一个双向链表结构,操作时间事件的相关函数封装在 ae.c 文件中,这部分我们下一篇文章再来介绍,本文我们主要介绍读写事件。
时间事件结构与读写事件结构中,都有着指向各自的处理函数的函数指针,他们的声明如下:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
事件处理函数的首个参数都是一个 aeEventLoop 结构的指针,这就是事件循环结构,他维护了整个 redis 体系内正在执行的或已经触发的全部事件集合:
typedef struct aeEventLoop {
// 最大文件描述符的值
int maxfd; /* highest file descriptor currently registered */
// 当前文件描述符的容量
int setsize; /* max number of file descriptors tracked */
// 下一个将产生的时间事件 id
long long timeEventNextId;
// 下一次将要触发时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 文件读写事件列表,数组下标为 fd
aeFileEvent *events; /* Registered events */
// 已触发事件列表
aeFiredEvent *fired; /* Fired events */
// 时间事件头指针
aeTimeEvent *timeEventHead;
// 是否已停止,0. 未停止,1. 已停止
int stop;
// 各个多路复用 IO 的底层数据,例如针对 epoll 来说,存储 epoll fd 和 epoll_event
void *apidata; /* This is used for polling API specific data */
// 在调用 processEvent 前的处理函数
aeBeforeSleepProc *beforesleep;
// 在调用 processEvent 后的处理函数
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
事件循环结构通过 void * 类型的 apidata 实现了对底层实现的隐藏,他存储了所有的事件,在整个事件驱动中起到提纲挈领的作用。
ae.c 中,定义了函数 aeCreateEventLoop,用来实现整个事件循环的创建和初始化工作:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
可以看到,这里并没有对 apidata 字段进行赋值,别急,下文马上就会来介绍具体的实现。
在 ae.c 中,通过宏实现了不同平台下对不同 IO 复用函数的选择:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
而在 config.h 中则有如下宏定义:
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
#ifdef __sun
#include <sys/feature_tests.h>
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif
可以看到,通过 gcc 编译过程中提供的宏,redis 实现了在不同平台使用不同 IO 复用函数的功能,本文我们重点关注 linux 平台下使用的 epoll 模块。
了解了具体事件的封装结构,接下来我们以 epoll 为例,看看 redis 是如何封装具体的操作的,这部分代码在 ae_epoll.c 中。
上文提到,事件循环结构中拥有一个 apidata 指针,用于对底层实现的隐藏,对于 epoll 来说,需要在整个事件驱动执行中存储 epoll_fd 和 epoll_event 列表,所以 ae_epoll.c 中首先对这两部分数据进行了封装:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
具体的 ae_xxx.c 中,定义并实现了下面几个函数,从而实现了具体的 IO 复用函数的封装:
// 创建新的 epoll 实例
static int aeApiCreate(aeEventLoop *eventLoop);
// 调整事件槽大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize);
// 释放 epoll 实例和事件槽
static void aeApiFree(aeEventLoop *eventLoop) {
// 增加新的事件到事件槽
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
// 从事件槽中删除事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
// 获取可执行事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
在 ae_epoll.c 中,上述几个函数是分别对 epoll 函数的封装:
这样看起来,就已经非常清晰了,实际上,他们只是对 epoll 函数的简单封装,结合之前文章对于 epoll 用法的介绍,代码阅读起来已经没有什么障碍了
具体代码参看附录。
通过上面的一系列函数,最终,通过 aeApiPoll(),redis 可以获取到已就绪的事件,接下来就是最为重要的事件分发过程了。 事件分发器实现了对文件事件与时间事件的分发处理,它是由 ae.c 文件中的 aeProcessEvents() 函数实现的。 这里,我们暂且不关注时间事件的处理,重点看下文件事件是如何实现的。
// 通过 aeApiPoll 获取已就绪事件列表
numevents = aeApiPoll(eventLoop, tvp);
// 读写事件处理后回调
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 事件分发
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取当前 fd 对应的事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
// 读事件操作
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
// 写事件操作
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 读写事件操作(读写事件需要先写后读)
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
事件分发整体逻辑非常简单,根据预先注册的事件回调依次调用即可,但这里有个细节,即除了读写两种事件外,还存在第三种 mask — AE_BARRIER,这是什么呢? 在事件分发函数中,某个事件可能既需要读,又需要写,此时 fe->mask & AE_WRITABLE 与 fe->mask & AE_READABLE 均返回 true,那么,在这种情况下,究竟应该先还是先写呢?显然这是另一个需要指定的属性,那么实现方式有两种:
显然,第二种方式占用空间更少,实现更为简洁,redis 就采用了这种方法,实现了先写后读类型的事件处理。 那么,“先写后读”的调用方式究竟应用在哪里呢? redis 官方文档中有一个例子:如果在 beforesleep 回调中进行了 fsync 动作,然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,用来翻转处理事件顺序了。 redis 中有多种多样的文件事件处理器以及时间事件处理器,具体的处理器我们后文再来介绍。
下面的示意图展示了 Redis 事件驱动的执行流程:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
aeApiCreate 实现了通过调用 epoll_create 生成 epoll_fd,并且创建事件槽,并且将由此构建的 aeApiState 结构实例赋值给此前已经初始化的事件循环结构的 apidata 字段。
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;
state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->epfd);
zfree(state->events);
zfree(state);
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
这个函数做了两件事:
需要注意的是,eventLoop 中,events 数组的下标为 fd,这是一种最简单的以空间换取时间的做法,虽然可能会有数组中某个位置会出现空缺,却能保证在 O(1) 时间内查找到 fd 对应的事件结构。
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
这个函数与 aeApiAddEvent 类似,同样做了两件事:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
aeApiPoll 是对 epoll_wait 的封装,他通过遍历 epoll_wait 的返回列表,获取已就绪的读写事件并返回。