前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis 事件机制是如何实现的?

Redis 事件机制是如何实现的?

作者头像
LinkinStar
发布2023-10-18 14:25:52
1990
发布2023-10-18 14:25:52
举报
文章被收录于专栏:LinkinStar's BlogLinkinStar's Blog

前言

我们都知道,Redis 是单线程(非严谨),你是否想过,一个线程要如何处理来自各个客户端的各种请求呢?它忙的过来吗?没错,它还真的能忙过来,并且还井井有条。其中多亏了 IO 多路复用,而不仅仅是它,事件机制在其中也是一个不错的设计。

之前我提到过有关于 IO 多路复用对于 Redis 的影响,IO多路复用和多线程会影响Redis分布式锁吗? 其中有部分内容其实已经提到了,所以本文会更加关注于事件机制本身。

PS:Redis 高版本已经支持多线程处理某些事情,为了简化,这里不做讨论,故下文出现的单线程仅是描述那些必须单线程执行的场景。

前置知识

尝试思考

首先,让我们来思考一下,如果是我们自己来实现,会尝试如何去做。

对于请求连接处理的思考

最笨的方法,那么就是来一个客户端 accept 一次,然后给什么请求做什么事情,先来先做,做完走人,对吧。那显然这样太慢了,要知道作为一个缓存,这样设计要把人给急死。

当然,我们也可以说,来一个我开一个线程单独处理你,相当于你一来我就单独找人为你服务,而服务的人最终会将请求给到一个处理中心,让处理中心统一去处理,然后将结果返回。但显然 Redis 没有那么多资源让你浪费。

于是要找人帮忙,那就是 IO 多路复用,至少它能帮我解决前面服务的问题,fd 我就不管了,直接告诉我哪些人来了,并且告诉我有事的是那些人。

反观机制的思考

既然 epoll_wait 能 告诉我们有那些 socket 已经就绪,那么我们就处理就绪的这些就可以了。但我们需要一个合理的机制来帮我们来优雅的处理他们,毕竟 Redis 后面只有个单线程在处理。由于处理没这么快,肯定需要一个地方来存放未处理的这些事件,那很合理就能想到需要一个类似 buffer 的东西。

所以,对于这个事件机制,我第一个想法就是弄个队列,或者 ringbuffer 来搞,那不就是一个生产消费者模型吗?

事件机制

那么下面我们就来看看 Redis 它是如何设计。

分类

首先 Redis 分了两类事件

  • fileEvents 文件事件,就是我们之前提到的请求的处理,我们也主要讨论这个
  • timedEvents 定时事件,没错肯定有一些定时任务触发的事件在里面

文件事件处理

OK,看完图我们就有了一个大致的印象,为了灵活的处理不同的事件,需要将事件分配给处理器去处理,这里也是我们之前思考的时候没有想到的一个设计。通常来说对于任何的处理往往都有这样一个分配器去分配所有的任务,这样可以让扩展更加灵活,如果后续有新的类型,只需要扩展出一个新的处理器就可以了。

源码分析

https://github.com/redis/redis/blob/9b1d4f003de1b141ea850f01e7104e7e5c670620/src/ae.c#L493

首先入口在 aeMain 这个简单,就是循环,也正是这个循环处理着所有的事件,我们可以看到,只要不停(stop),就会一直循环处理

代码语言:javascript
复制
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

然后就是我们重点的 aeProcessEvents 方法,其中重点就是调用 aeApiPoll 获取当前就绪的事件,然后你就能看到我们的 aeFileEvent 也就是文件事件了,最后还有 processTimeEvents 处理定时事件。那么事件本身,是如何处理的呢?就是 rfileProc 和 wfileProc 一个处理读一个处理写。那么问题来了,这两个方法具体是什么呢?卖个关子,我们先瞅一眼 aeApiPoll

代码语言:javascript
复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
        int64_t usUntilTimer;

        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
            eventLoop->beforesleep(eventLoop);

        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else if (flags & AE_TIME_EVENTS) {
            usUntilTimer = usUntilEarliestTimer(eventLoop);
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            }
        }
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. 注意这里!!!!!!!!!!!!!! */
        numevents = aeApiPoll(eventLoop, tvp);

        /* Don't process file events if not requested. */
        if (!(flags & AE_FILE_EVENTS)) {
            numevents = 0;
        }

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                /* rfileProc 在处理什么事件呢? */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                /* wfileProc 在处理什么事件呢? */
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

这里其他都不重要,重点就在我们熟悉的 epoll_wait ,获取所有就绪的 fd 也就能知道所有需要处理的事件了。

代码语言:javascript
复制
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

好了,我们来解密究竟 rfileProcwfileProc 是什么,aeCreateFileEvent 方法是用于创建 FileEvent 的方法,其中的入参里面有 aeFileProc 没错就是它了。根据不同的类型用不同的 handler 创建不同的 event。也就是说,最终的处理方式是通过参数传递进去的。

代码语言:javascript
复制
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

小思考🤔

如果是我设计,或许绝大多数情况下就是弄一个对象,而对象根据具体的事件类型执行不同的处理逻辑。最多用一个 策略模式 可能就上天了。而 Redis 的这样的设计思路,类似一种闭包的设计,或者说函数式编程的一种思路吧,将具体的处理对象,处理方式,处理结果,通通包含在内。我们先不说这样的设计好不好,但给我的第一印象是,这样的设计会让我觉得最终执行的整个处理会更加连贯,并且处理的时候执行的全部逻辑是高度一致的,而处理方式的本身真正做到了可扩展

总结

那我们通过 Redis 的事件机制能学到什么呢?

  1. 这个事件机制的模型很通用也很清晰,包含:接收、循环、处理,三个部分,很标准的设计
  2. 其中对于任务的处理有一个专门的分配器去分配,这在很多 handler 的设计中非常实用,熟悉 java 的同学应该知道 DispatcherServlet 没错这样的模型会更加的清晰
  3. 易于扩展,这里的扩展有两方面一方面是对于处理器的扩展,之后有其他事件类型只需要增加事件处理器就可以了;而另一方面这里的扩展还包括了多线程的扩展,方便了同时支持多个事件的处理。 其实,Redis 的事件机制是一个标准的 Reactor模式 是一种基于事件驱动的设计模式,所以我们更多的是要学到这样设计模式,来运用到以后的编码中,可以更清晰也易扩展。

参考链接

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-07-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 前置知识
  • 尝试思考
    • 对于请求连接处理的思考
      • 反观机制的思考
      • 事件机制
        • 分类
          • 文件事件处理
            • 源码分析
              • 小思考🤔
              • 总结
              • 参考链接
              相关产品与服务
              云数据库 Redis
              腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档