前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis0.1源码解析之事件驱动

redis0.1源码解析之事件驱动

作者头像
theanarkh
发布2020-05-27 23:41:12
3490
发布2020-05-27 23:41:12
举报
文章被收录于专栏:原创分享原创分享

redis的事件驱动模块负责处理文件和定时器两种任务。 下面是几个函数指针

代码语言:javascript
复制
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

下面是文件相关的结构体

代码语言:javascript
复制
 struct aeFileEvent {
    int fd;
    int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */
    aeFileProc *fileProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeFileEvent *next;
} aeFileEvent;

下面是定时器结构体

代码语言:javascript
复制
struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

下面是事件循环的核心结构体

代码语言:javascript
复制
struct aeEventLoop {
    // 定时器id,每创建一个定时器结构体,就加一
    long long timeEventNextId;
    // 两个链表
    aeFileEvent *fileEventHead;
    aeTimeEvent *timeEventHead;
    int stop;
} aeEventLoop;

下面先看一下一些基础函数,然后再分析具体流程。

1 创建一个事件循环结构体

代码语言:javascript
复制
aeEventLoop *aeCreateEventLoop(void) {
    aeEventLoop *eventLoop;
    eventLoop = zmalloc(sizeof(*eventLoop));
    if (!eventLoop) return NULL;
    eventLoop->fileEventHead = NULL;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    return eventLoop;
}

2 文件相关函数

2.1 新建一个文件相关的结构体

代码语言:javascript
复制
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    aeFileEvent *fe;

    fe = zmalloc(sizeof(*fe));
    if (fe == NULL) return AE_ERR;
    fe->fd = fd;
    fe->mask = mask;
    fe->fileProc = proc;
    fe->finalizerProc = finalizerProc;
    fe->clientData = clientData;
    fe->next = eventLoop->fileEventHead;
    eventLoop->fileEventHead = fe;
    return AE_OK;
}

2.2 删除一个文件相关的结构体

代码语言:javascript
复制
// 删除某个节点(fd和mask等于入参的节点)
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    aeFileEvent *fe, *prev = NULL;

    fe = eventLoop->fileEventHead;
    while(fe) {
        if (fe->fd == fd && fe->mask == mask) {
            // 说明待删除的节点是第一个节点,直接修改头节点的指针
            if (prev == NULL)
                eventLoop->fileEventHead = fe->next;
            else
                // 修改prev节点的next指针指向当前删除节点的下一个节点
                prev->next = fe->next;
            // 钩子函数
            if (fe->finalizerProc)
                fe->finalizerProc(eventLoop, fe->clientData);
            // 释放待删除节点的内存
            zfree(fe);
            return;
        }
        // 记录上一个节点,当找到待删除节点时,修改prev指针的next指针(如果prev非空)为待删除节点的下一个节点
        prev = fe;
        fe = fe->next;
    }
}

3 定时器相关函数

代码语言:javascript
复制
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    // 头插法插入eventLoop的timeEventHead队列
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

3.1 时间相关的函数

代码语言:javascript
复制
// 获取当前时间,秒和毫秒
static void aeGetTime(long *seconds, long *milliseconds)
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    *seconds = tv.tv_sec;
    *milliseconds = tv.tv_usec/1000;
}

static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
    long cur_sec, cur_ms, when_sec, when_ms;
    // 获取
    aeGetTime(&cur_sec, &cur_ms);
    // 绝对时间,秒数
    when_sec = cur_sec + milliseconds/1000;
    // 绝对时间,毫秒数
    when_ms = cur_ms + milliseconds%1000;
    // 大于一秒则进位到秒中
    if (when_ms >= 1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    // 返回绝对时间的秒和毫秒
    *sec = when_sec;
    *ms = when_ms;
}

3.2 删除一个定时器结构体(参考删除文件相关数据结构的函数)

代码语言:javascript
复制
// 删除一个timeEvent节点
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te, *prev = NULL;

    te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            return AE_OK;
        }
        prev = te;
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}

3.3 查找最快到期的定时器节点

代码语言:javascript
复制
// 找出最快到期的节点
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while(te) {
        /*
            nearest记录当前最快到期的节点,初始化为NULL
            1 nearest为空,把当前节点作为最小值
            2 when_sec小的作为最小值
            3 when_sec一样的情况下,when_ms小者为最小值
        */
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

最后来看一下事件处理的逻辑,入口函数是

代码语言:javascript
复制
void aeMain(aeEventLoop *eventLoop)
{
    eventLoop->stop = 0;
    while (!eventLoop->stop)
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}

该函数由redis初始化时,main函数调用。这个版本使用的多路复用函数是select

代码语言:javascript
复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int maxfd = 0, numfd = 0, processed = 0;
    fd_set rfds, wfds, efds;
    aeFileEvent *fe = eventLoop->fileEventHead;
    aeTimeEvent *te;
    long long maxId;
    AE_NOTUSED(flags);

    // 两种类型的事件都不需要处理
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    // 初始化select对应的结构体,读,写,异常三种事件
    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    // 处理文件事件
    if (flags & AE_FILE_EVENTS) {
        while (fe != NULL) {
            // 根据需要处理的事件,设置对应的变量对应的位
            if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);
            if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);
            if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);
            // 记录最大文件描述符select的时候需要用
            if (maxfd < fe->fd) maxfd = fe->fd;
            // 标记是否有文件事件
            numfd++;
            fe = fe->next;
        }
    }
    // 有文件事件需要处理,或者有time事件并且没有设置AE_DONT_WAIT(设置的话就不会进入select定时阻塞)标记
    if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int retval;
        aeTimeEvent *shortest = NULL;
        /*
            struct timeval {
                long    tv_sec;         // seconds 
                long    tv_usec;        // and microseconds 
            };
        */
        struct timeval tv, *tvp;
        // 有time事件需要处理,并且没有设置AE_DONT_WAIT标记,则select可能会定时阻塞(如果有time节点的话)
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 找出最快到期的节点
            shortest = aeSearchNearestTimer(eventLoop);
        // 有待到期的time节点
        if (shortest) {
            long now_sec, now_ms;
            // 获取当前时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            // 算出相对时间,秒数
            tvp->tv_sec = shortest->when_sec - now_sec;
            // 不够,需要借位
            if (shortest->when_ms < now_ms) {
                // 微秒
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                // 借一位,减一
                tvp->tv_sec --;
            } else {
                // 乘以1000,即微秒
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
        } else {
            // 没有到期的time节点
            // 设置了AE_DONT_WAIT,则不会阻塞在select
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 一直阻塞直到有事件发生
                tvp = NULL; /* wait forever */
            }
        }

        retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
        if (retval > 0) {
            fe = eventLoop->fileEventHead;
            while(fe != NULL) {
                int fd = (int) fe->fd;
                // 有感兴趣的事件发生
                if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
                    (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
                    (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
                {
                    int mask = 0;
                    // 记录发生了哪些感兴趣的事件
                    if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
                        mask |= AE_READABLE;
                    if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
                        mask |= AE_WRITABLE;
                    if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
                        mask |= AE_EXCEPTION;
                    // 执行回调
                    fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);
                    processed++;
                    /* After an event is processed our file event list
                     * may no longer be the same, so what we do
                     * is to clear the bit for this file descriptor and
                     * restart again from the head. */
                    /*
                        执行完回调后,文件事件队列可能发生了变化,
                        重新开始遍历
                    */
                    fe = eventLoop->fileEventHead;
                    // 清除该文件描述符
                    FD_CLR(fd, &rfds);
                    FD_CLR(fd, &wfds);
                    FD_CLR(fd, &efds);
                } else {
                    fe = fe->next;
                }
            }
        }
    }
    // 处理time事件
    if (flags & AE_TIME_EVENTS) {
        te = eventLoop->timeEventHead;
        // 先保存这次需要处理的最大id,防止在time回调了不断给队列新增节点,导致死循环
        maxId = eventLoop->timeEventNextId-1;
        while(te) {
            long now_sec, now_ms;
            long long id;
            // 在本次回调里新增的节点,跳过
            if (te->id > maxId) {
                te = te->next;
                continue;
            }
            // 获取当前时间
            aeGetTime(&now_sec, &now_ms);
            // 到期了
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;

                id = te->id;
                // 执行回调
                retval = te->timeProc(eventLoop, id, te->clientData);

                // 是否需要继续注册事件,是则修改超时时间,否则删除该节点
                if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    aeDeleteTimeEvent(eventLoop, id);
                }
                te = eventLoop->timeEventHead;
            } else {
                te = te->next;
            }
        }
    }
    // 处理的事件个数
    return processed; /* return the number of processed file/time events */
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 创建一个事件循环结构体
  • 2 文件相关函数
  • 3 定时器相关函数
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档