(1)心跳检测 (2)游戏中的技能冷却 (3)倒计时 (4)其他需要延迟处理的功能 定时器,就是指定某个时间处理某个事情。
对于服务器而言,驱动服务程序业务逻辑的事件包括网络事件、定时事件、以及信号事件;定时器触发形式通常有两种: 网络事件和定时事件在一个线程中处理;例如:nginx、redis、memcached。 网络事件和定时事件在不同线程中处理;例如:skynet。
网络事件和定时事件可以进行协同处理;即网络事件和定时事件在一个线程中处理。一般通过epoll_wait()的第四个参数作为延时触发定时器,业务逻辑的执行也在同一个线程中。 示例:
// ... 其他代码
while(!quit)
{
int timeout=get_nearest_timer()-now_time();
if(timeout<0)
timeout=-1;
int event_count=epoll_wait(epfd,ev,ev_num,timeout);
for(int i=0;i<event_count;i++)
{
// ... 处理网络事件
}
// 处理定时事件
update_timer();// 里面检测是否到时间处理定时任务,如果没有到时间直接返回
// ... 其他代码
}
// ... 其他代码
(1)epoll对事件的处理是通过epoll_ctl()添加事件,在epoll_wait()检测IO就绪进行异步处理事件。 (2)定时器设置时需要携带callback函数异步处理定时任务。 由此看出,因为reactor是基于事件的网络IO模型,IO的处理是同步的,事件的处理是异步的,而定时任务的处理也是异步的,所以事件的处理和定时任务的处理可以在一个线程中一起处理。
那么,如何进行协同处理呢? 可以利用IO多路复用,“阻塞”收集就绪事件的接口。如上面的示例代码。
(1)redis(单reactor) (2)memcached、nginx(多reactor)
(1)适用于定时任务少的情景。 (2)定时任务多了,会影响网络事件的处理。 (4)在多线程情况下,会引起事件处理不平衡。
函数调用框图:
具体代码:
// server.c
int main(int argc,char *argv[])
{
// ... 其他代码
aeMain(server.el);//事件循环
// ... 其他代码
}
// ... 其他代码
// ae.c
// ... 其他代码
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
// ... 其他代码
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// ... 其他代码
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);//获取最近定时器任务
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);// 进入epoll_wait(),处理IO事件
// ... 其他代码
}
// ... 其他代码
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);//处理定时器事件
return processed; /* return the number of processed file/time events */
}
// ... 其他代码
// ae_epoll.c
// ... 其他代码
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
// ... 其他代码
定时任务在通过一个单独的线程检测,利用usleep()/sleep()检测触发定时器,定时器事件的处理由其他线程或运行队列执行。 这种触发方式通常用于处理大量定时任务。 示例:
// 网络事件和定时事件在不同线程中处理
void * thread_timer(void *thread_param) {
init_timer();
while (!quit) {
update_timer();
usleep(t);
}
clear_timer();
return NULL;
}
pthread_create(&pid,NULL,thread_timer,¶ms);
处理方式:使用时间轮数据结构,在一个线程中利用usleep(time)负责检测(time要小于最小时间精度),时间到达时,通过信号或插入运行队列让其他线程运行业务逻辑;时间轮只负责检测。这种方式加锁粒度小。
接口设计是为了让用户方便的使用定时器。 最基础的接口有: (1)定时器初始化。 (2)添加定时器。 (3)删除定时器。 示例:
// 初始化定时器
void init_timer();
// 添加定时器
Node *add_timer(int expire,callback cb);
// 删除定时器
bool delete_timer(Node* node);
其他接口有: (4)找到最近要触发的定时任务。这种接口一般应用于网络事件和定时事件在一个线程中处理的触发方式。 (5)更新检测定时器。 (6)清除定时器。 示例:
// 找到最近要触发的定时任务
Node* find_nearest_timer();
// 更新检测定时器
void update_timer();
// 清除定时器
void clear_timer();
数据结构去组织定时任务的时候,其本质就是按照定时任务的优先级进行组织数据。所谓优先级,就是先触发的定时任务放在最前面。 组织方式:
其他数据数据结构:
(2)完全二叉树:若二叉树的深度为h,去掉了h层的节点,就是一个满二叉树;并且h层都集中在最左侧排序。
最小堆是一个完全二叉树;某个节点的值总是小于等于它的子节点 的值;堆中任意一个节点的子数都是最小堆。
最小堆添加节点:为了满足完全二叉树的定义,往二叉树最高层沿着最左侧添加一个节点;然后考虑是否能上升操作。比如往上图的最小堆添加值为 4 的节点,4 节点是 5 节点的左子树;4 比 5 小,4 和 5 需要交换值。
最小堆删除节点:删除操作需要先查找是否包含这个节点;确定存在后,交换最后一个节点,先考虑能否执行下降操作,否则执行上升操作;最后删除最后一个节点。 例如:上图中删除 1 号节点,需要下沉操作;
上图删除 9 号节点,需要上升操作;
时间轮数据结构:时间轮是根据时钟运行规律而来的。时间精度为1s,时间范围为12H,定义三个数组分别存 秒、分、时;一个指针一秒钟移动一次,只需关注最近一分钟内要触发的定时任务。
时间轮是根据时钟运行规律而来的。
原理图:
实现原理:时间精度为1s,时间范围为12H,定义三个指针数组分别指向 秒、分、时;利用一个移动指针一秒钟移动一次,只需关注最近一分钟内要触发的定时任务。
如果要加入超过一分钟的定时任务,需要将指针指向的时间加上需要定时的任务时间对60求余,向下取整。 比如要加入72s的定时任务,当前指针指向0,则插入的位置是分钟数组的(0+72)%60=1位置。
也就是说,一分钟以内的任务放在秒针层,大于一分钟小于一小时的定时任务放在分针层级,大于一小时小于12小时的定时任务放在时针层级。
tick的取值范围为时间范围,只需要一个指针记录。因为通过时间可以计算出秒针层的位置、分钟层位置以及小时层的位置;比如121s的秒针是1、分针是2,时针是0。 当秒针移动一圈,说明下一分钟的任务快执行了。
(1)减少空间占用。比如要表示12H的数组需要126060=43200个元素大小,分成三层只需要60+60+12=132个元素大小。 (2)只需关注最近一分钟内要触发的定时任务。 (3)按照任务的轻重缓急进行组织。时间在前的先处理。 (4)减少任务的检测。相同时间的定时任务放在一个链表中。
任务节点应该包含定时时间expire、回调函数callback以及任务链表next。任务链表主要解决相同触发时间的定时任务。
添加定时任务时,需要根据time判断将其放在哪一层。然后任务节点中的expire就等于当前的tick加上time,即expire=tick+time。
时间轮的时间进度是秒,只执行秒层的任务,所以需要将快到达的分钟层任务重新映射到秒层。 根据tick计算出分钟指针位置【分钟层指针位置=(tick/60)%60】,取出该指针指向槽位的所有任务(任务链表);重新计算时间 time=expire-tick;然后指向添加节点的业务逻辑。
时间轮删除节点不方便,一般节点不能删除,因为tick一直在移动,会出现重新映射,节点位置可能改变。 那么可以添加一个标记字段cancel,当任务触发时检查这个字段,如果cancel=true则不执行具体任务。
(1)Linux内核的定时任务 (2)游戏服务器框架 skynet (3)分布式消息队列 kafka (4)java网络库netty
游戏服务器框架,skynet。
skynet使用单reactor,应用于CPU密集型场景;skynet封装有actor的抽象进程,里面有消息对立;skynet有自己的线程池,线程池从actor中取出就绪的定时任务,多线程执行定时任务业务逻辑。
struct timer_event {
uint32_t handle;
int session;
};
// 定时任务
struct timer_node {
struct timer_node *next;
uint32_t expire;
};
// 任务链表
struct link_list {
struct timer_node head;
struct timer_node *tail;
};
struct timer {
struct link_list near[TIME_NEAR];
struct link_list t[4][TIME_LEVEL];
struct spinlock lock; // 自旋锁,多线程操作需要,时间复杂度O(1)
uint32_t time; //时间指针,范围是2^32*10ms
uint32_t starttime;
uint64_t current;
uint64_t current_point;
};
skynet的时间轮采用五层结构,时间精度为10ms(由gettime()函数确定)。每10ms时间指针移动一次。
时间精度:
static uint64_t
gettime() {
uint64_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint64_t)ti.tv_sec * 100;
t += ti.tv_nsec / 10000000;
return t;
}
skynet是在多线程下运行,需要数据操作加锁。skynet对整个结构加锁,但时间轮的时间复杂度为O(1),可以使用自旋锁,在效率上不会产生影响。 skynet在添加节点和取出任务时需要加锁。
修改skynet的时间精度可以修改gettime()函数以及usleep(2500)。
红黑树和最小表需要对整个结构加锁。锁的粒度较大。如
lock(&mutex)
//执行临界资源
unlock(&mutex)
跳表粒度很小,基本上是原子操作。 数据结构的选择时,少量定时任务的情况下,可以选择红黑树或最小堆;大量定时任务情况下,选择时间轮。 操作时间复杂度小,为O(1)时,或者仅需要局部加锁,可以降低锁的粒度。
(1)确定时间范围。 (2)确定时间精度。如skynet有usleep()和gettime()确定 (3)确定时间层级。第一层组织最近关注的延时任务,是实际执行的层级;其他层级只负责向上一层级重新映射。 (4)实现添加节点接口。 (5)实现重新映射,每一次时间指针移动都需要判断是否可重新映射。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。