前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis主循环源码分析

redis主循环源码分析

原创
作者头像
冰寒火
修改2022-11-07 00:59:00
8510
修改2022-11-07 00:59:00
举报
文章被收录于专栏:软件设计

一、背景

redis是内存数据库,并且是单线程,为什么单线程也能够这么快?

因为,所有线上请求的set、get操作都是在内存中,涉及到磁盘和网络的部分都是由后台线程执行,尽量减少了主线程的开销。单线程只是说对字典空间set、get时是单线程的,不需要同步机制,而将数据在用户空间和socket buffer之间的拷贝是由io_thread_list做的,其中主线程也算是其中一个io_thread。

二、epoll使用案例

为了降低redis理解时的复杂度,先给出一个epoll使用时的case,有助于后面的理解。

代码语言:c
复制
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include<errno.h>
#include<fcntl.h>
#define _BACKLOG_ 5
#define _BUF_SIZE_ 10240
#define _MAX_ 64


typedef struct _data_buf
{
    int fd;
    char buf[_BUF_SIZE_];
}data_buf_t,*data_buf_p;
static void usage(const char* proc)
{
    printf("usage:%s[ip][port]\n",proc);
}

static int start(int port,char *ip)
{
    assert(ip);
    int sock=socket(AF_INET,SOCK_STREAM,0);
    if(sock<0)
    {
        perror("socket");
        exit(1);
    }

    struct sockaddr_in local;
    local.sin_port=htons(port);
    local.sin_family=AF_INET;
    local.sin_addr.s_addr=inet_addr(ip);

    int opt=1;  //设置为接口复用
    setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

    if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
    {
        perror("bind");
        exit(2);
    }

    if(listen(sock,_BACKLOG_)<0)
    {
        perror("listen");
        exit(3);
    }
    return sock;
}

static int epoll_server(int listen_sock)
{
    int epoll_fd=epoll_create(256);//生成一个专用的epoll文件描述符
    if(epoll_fd<0)
    {
        perror("epoll_create");
        exit(1);
    }

    struct epoll_event ev;//用于注册事件
    struct epoll_event ret_ev[_MAX_];//数组用于回传要处理的事件
    int ret_num=_MAX_;
    int read_num=-1;
    ev.events=EPOLLIN;
    ev.data.fd=listen_sock;
    //注册监听套接字
    if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ev)<0)//用于控制某个文件描述符上的事件(注册,修改,删除)
    {
        perror("epoll_ctl");
        return -2;
    }

    int done=0;
    int i=0;
    int timeout=5000;
    struct sockaddr_in client;
    socklen_t len=sizeof(client);
    while(!done)
    {
        //ret_ev存放的是触发的事件
        switch(read_num=epoll_wait(epoll_fd,ret_ev,ret_num,timeout))//用于轮寻I/O事件的发生
        {
            case 0:                                                                          
                printf("time out\n");
                break;
            case -1:
                perror("epoll");
                exit(2);
            default:
                {
                    for(i=0;i<read_num;++i)
                    {
                        //连接建立事件
                        if(ret_ev[i].data.fd==listen_sock&&(ret_ev[i].events&EPOLLIN))
                        {
                            int fd=ret_ev[i].data.fd;
                            int new_sock=accept(fd,(struct sockaddr*)&client,&len);
                            if(new_sock<0)
                            {
                                perror("accept");
                                continue;
                            }

                            ev.events=EPOLLIN;
                            ev.data.fd=new_sock;
                            epoll_ctl(epoll_fd,EPOLL_CTL_ADD,new_sock,&ev);
                            printf("get a new client...\n");
                        }
                        else  //normal sock
                        {
                            //读事件
                            if(ret_ev[i].events&EPOLLIN)
                            {
                                int fd=ret_ev[i].data.fd;
                                data_buf_p mem=(data_buf_p)malloc(sizeof(data_buf_t));
                                if(!mem){
                                    perror("malloc failed...");                                                                      
                                    continue;
                                }
                                mem->fd=fd;
                                memset(mem->buf,'\0',sizeof(mem->buf));
                                ssize_t _s=read(mem->fd,mem->buf,sizeof(mem -> buf)-1);
                                if(_s>0)
                                {
                                    mem->buf[_s-1]='\0';
                                    printf("client: %s\n",mem->buf);
                                    ev.events=EPOLLOUT;
                                    ev.data.ptr=mem;
                                    epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
                                }
                                else if(_s==0)
                                {
                                    printf("client close...\n");
                                    epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
                                    close(fd);
                                    free(mem);
                                }
                            }
                            else if(ret_ev[i].events&EPOLLOUT)  //写事件准备就绪
                            {
                                data_buf_p mem=(data_buf_p)ret_ev[i].data.ptr;
                                char* msg="http/1.0 200 ok\r\n\r\nhello bit\r\n";
                                int fd=mem->fd;
                            
                                write(fd,msg,strlen(msg));
                                close(fd);
                                epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,&ev);  //写完服务端直接退出
                                free(mem);
                            }
                        }
                    }
                }
        }
    }
}

int main(int argc,char* argv[])
{
    if(argc!=3)
    {
        usage(argv[0]);
        return 1;
    }

    int port=atoi(argv[2]);
    char *ip=argv[1];
    //创建监听套接字
    int listen_sock=start(port,ip);
    printf("listening... ip: %s, port: %d\n",ip,port);
    //注册监听套接字事件,accept新链接并注册进入epoll
    epoll_server(listen_sock);
    close(listen_sock);
    return 0;
}

主要流程如下:

  1. 创建socket,并转化成为listen socket。
  2. 将listen socket fd注册到epoll中,epollfd是用来标识epoll这个对象的,处处皆文件?。
  3. 如果触发事件的fd是listen socket fd,那表示来了新连接,
    • 先要epoll_wait获取触发的事件;
    • 调用accept创建新连接newfd;
    • 将newfd注册读事件到epoll中。
  4. 如果触发的是写事件,则调用write,并删除这个事件。
  5. 如果触发的是读事件,则调用read获取请求,并投递到业务线程池,执行完后会注册一个写事件到达epoll中。服务端一般采用ET模式,没读完下次也不会返回这个事件,减少系统调用次数及上下文切换,所以读时一般会用while循环一次性读完。
epoll case
epoll case

三、redis事件注册

事件循环
事件循环

redis初始化时会创建所有的listen socket并注册到eventloop中监听,绑定accept Handler,当连接建立时就会accept新连接,创建client并向eventloop注册读事件。

下面主要会从以下几方面叙述:

  1. 事件的注册逻辑,包括listen socket fd以及client fd。
  2. 读事件分发处理逻辑。
  3. 过期key逻辑。
  4. 写事件分发处理逻辑。
  5. 监听器暂时不讲。

事件注册包括创建listen socket,注册事件,设置handler等。

事件分发流程如下所示,后面会慢慢介绍。

事件分发流程
事件分发流程

1 数据结构

代码语言:c
复制
//只涉及后面用到的字段
struct redisServer {
    redisDb *db; 
    
    aeEventLoop *el; //epoll

    socketFds cfd;              /*  listening socket */
    list *clients;              /* List of active clients */
    list *clients_to_close;     /* Clients to close asynchronously */
    list *clients_pending_write; /* There is to write or install handler. */
    list *clients_pending_read;  /* Client has pending read socket buffers. */
    client *current_client;    
}

//io多路复用

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* 当前所有注册的事件,包括已经触发和没有触发的*/
    aeFiredEvent *fired; /* 本次触发的事件*/
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* aeApiState */
    aeBeforeSleepProc *beforesleep; //预处理,很重要,请求分发逻辑
    aeBeforeSleepProc *aftersleep; //尾处理
    int flags;
} aeEventLoop;

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

2 创建listen socket

这一阶段完成了socket、bind、listen三步。

代码语言:c
复制
int listenToPort(int port, socketFds *sfd) {
    int j;
    char **bindaddr = server.bindaddr;

    /* If we have no bind address, we don't listen on a TCP socket */
    if (server.bindaddr_count == 0) return C_OK;

    for (j = 0; j < server.bindaddr_count; j++) {
        char* addr = bindaddr[j];
        int optional = *addr == '-';
        if (optional) addr++;
        if (strchr(addr,':')) {
            /* Bind IPv6 address. */
            sfd->fd[sfd->count] = anetTcp6Server(server.neterr,port,addr,server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            //socket、bind、listen这三步,返回的是listen socket fd
            sfd->fd[sfd->count] = anetTcpServer(server.neterr,port,addr,server.tcp_backlog);
        }
        if (sfd->fd[sfd->count] == ANET_ERR) {
            int net_errno = errno;
            serverLog(LL_WARNING,
                "Warning: Could not create server TCP listening socket %s:%d: %s",
                addr, port, server.neterr);
            if (net_errno == EADDRNOTAVAIL && optional)
                continue;
            if (net_errno == ENOPROTOOPT     || net_errno == EPROTONOSUPPORT ||
                net_errno == ESOCKTNOSUPPORT || net_errno == EPFNOSUPPORT ||
                net_errno == EAFNOSUPPORT)
                continue;

            /* Rollback successful listens before exiting */
            closeSocketListeners(sfd);
            return C_ERR;
        }
        if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
        anetNonBlock(NULL,sfd->fd[sfd->count]);
        anetCloexec(sfd->fd[sfd->count]);
        sfd->count++;
    }
    return C_OK;
}

3 将listen socket fd注册到eventloop中监听

代码语言:c
复制
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
    int j;
	//对每个listen socket进行
    for (j = 0; j < sfd->count; j++) {
        //将listen socket fd注册读事件到epoll中监听,当触发时执行accept_handler
        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

//ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
	//注册事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

//networking.c
//listen socket fd触发事件时执行的handler,用于建立新连接
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        //接受新连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        //将cfd封装成connection并创建client,并且会将新连接注册到eventloop中
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

将每个listen socket fd都以读事件的形式注册到eventloop中并设置accept Handler,一旦事件触发,就执行accept Handler接收新连接。

4 注册client读写事件

代码语言:c
复制
struct connection {
    ConnectionType *type;
    ConnectionState state;
    short int flags;
    short int refs;
    int last_errno;
    void *private_data;
    ConnectionCallbackFunc conn_handler;
    ConnectionCallbackFunc write_handler;
    ConnectionCallbackFunc read_handler;
    int fd;
};

typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    uint64_t flags;         /* Client flags: CLIENT_* macros. */
    connection *conn;
    int resp;               /* RESP protocol version. Can be 2 or 3. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */

    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    int argv_len;           /* Size of argv array (may be more than argc) */
    int original_argc;      /* Num of arguments of original command if arguments were rewritten. */
    robj **original_argv;   /* Arguments of original command if arguments were rewritten. */
    
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    struct redisCommand *realcmd; /* The original command that was executed by the client,
                                     Used to update error stats in case the c->cmd was modified
                                     during the command invocation (like on GEOADD for example). */
    user *user;             /* User associated with this connection. If the
                               user is set to NULL the connection can do
                               anything (admin). */
   
    list *reply;            /* List of reply objects to send to the 
    multiState mstate;      /* MULTI/EXEC state */

    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */

    listNode *client_list_node; /* list node in client list */
    listNode *postponed_list_node; /* list node within the postponed list */
    listNode *pending_read_list_node; /* list node in clients pending read list */
    char *buf;
} client;
//创建connection
connection *connCreateAcceptedSocket(int fd) {
    connection *conn = connCreateSocket();
    conn->fd = fd;
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
   //...
    /* Create connection and client */
    //创建client并向eventloop注册事件
    if ((c = createClient(conn)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (conn: %s)",
            connGetLastError(conn),
            connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
    c->flags |= flags;

    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        if (connGetState(conn) == CONN_STATE_ERROR)
            serverLog(LL_WARNING,
                    "Error accepting a client connection: %s (conn: %s)",
                    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
        freeClient(connGetPrivateData(conn));
        return;
    }
}

这一阶段主要是accept新连接后,会为新连接创建client和connection这两个对象,connection主要是对tcp连接的操作,client承载了会话期间的数据。最后将client添加到server.clients中,以及将cfd以读事件注册到eventloop里面。

代码语言:c
复制
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
            //注册事件
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);
    selectDb(c,0);

    if (conn) linkClient(c);
    initClientMultiState(c);
    return c;
}

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
    //向eventloop注册一个读事件监听后续变化
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

这一步主要是建立客户端连接并封装成client,注册到eventloop监听后续的请求。下面是client的连接相关的函数。

代码语言:c
复制
ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .writev = connSocketWritev,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

三、beforeSleep

beforeSleep是事件分发的核心,包括分发读事件、写事件、删除过期键等逻辑。

1 handleClientsWithPendingReadsUsingThreads

处理读事件
处理读事件
代码语言:c
复制
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    //读事件被唤醒后就会加入到clients_pending_read,用于分发给io线程组
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        //分发任务给多线程,每个线程有一个client链表
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        //io线程会自旋等待,当count>0表示有新任务需要处理
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        //读取数据到buffer并解析请求及参数
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    //主线程阻塞等待io线程读完数据
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    io_threads_op = IO_THREADS_OP_IDLE;

    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        //移除掉准备执行的client
        listDelNode(server.clients_pending_read,ln);
        c->pending_read_list_node = NULL;

        serverAssert(!(c->flags & CLIENT_BLOCKED));

        if (beforeNextClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }

        /* Once io-threads are idle we can update the client in the mem usage */
        updateClientMemUsage(c);

        if (processPendingCommandAndInputBuffer(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }

        /* We may have pending replies if a thread readQueryFromClient() produced
         * replies and did not put the client in pending write queue (it can't).
         */
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            putClientInPendingWriteQueue(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

主线程将apiPoll拉取的一批事件分发给io_thread_list,让io thread来从socket buffer拷贝数据并解码,主线程负责io_thread_list0的事件处理,处理完后主线程会while(1) 自旋等待其他io thread处理结束。然后主线程在执行每个client的命令,并从server.client_pending_read上移除掉这些client。

迭代每一个处理过的client,执行它的命令。

代码语言:c
复制
int processPendingCommandAndInputBuffer(client *c) {
    if (c->flags & CLIENT_PENDING_COMMAND) {
        c->flags &= ~CLIENT_PENDING_COMMAND;
        if (processCommandAndResetClient(c) == C_ERR) {
            return C_ERR;
        }
    }
    if (c->querybuf && sdslen(c->querybuf) > 0) {
        return processInputBuffer(c);
    }
    return C_OK;
}

int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;
    //执行命令
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
        updateClientMemUsage(c);
    }

    if (server.current_client == NULL) deadclient = 1;
    server.current_client = old_client;
    return deadclient ? C_ERR : C_OK;
}
int processCommand(client *c) {
    if (!scriptIsTimedout()) {
        serverAssert(!server.in_exec);
        serverAssert(!scriptIsRunning());
    }

    moduleCallCommandFilters(c);



    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
    sds err;

    //...各种拒绝的逻辑
   
    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand &&
        c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand &&
        c->cmd->proc != watchCommand &&
        c->cmd->proc != quitCommand &&
        c->cmd->proc != resetCommand)
    {
        //事务,请求会进入multistate队列,等exec到达后一起执行
        queueMultiCommand(c, cmd_flags);
        addReply(c,shared.queued);
    } else {
        //单一命令
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

请求的解析过程是在readQueryFromClient中,将数据从socket buffer拷贝到用户buffer中,并解析参数和命令。主线程执行命令,如果是事务且非exec命令,则进入multiState队列,否则执行。

读事件执行完后会调用addReply函数将响应事件添加到server.clients_pending_write队列。

代码语言:c
复制
void addReply(client *c, robj *obj) {
    //client进入写队列
    if (prepareClientToWrite(c) != C_OK) return;
    //写入返回的数据
    if (sdsEncodedObject(obj)) {
        _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        _addReplyToBufferOrList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}
int prepareClientToWrite(client *c) {
   //...
    if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
    if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
        putClientInPendingWriteQueue(c);

    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}
void putClientInPendingWriteQueue(client *c) {
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
    {
        c->flags |= CLIENT_PENDING_WRITE;
        //添加到写队列上
        listAddNodeHead(server.clients_pending_write,c);
    }
}

2 处理过期键

代码语言:c
复制
void activeExpireCycle(int type) {
    /* Adjust the running parameters according to the configured expire
     * effort. The default effort is 1, and the maximum configurable effort
     * is 10. */
    unsigned long
    effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */


    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
        //遍历每一个数据库
        /* Expired and checked in a single loop. */
        unsigned long expired, sampled;

        redisDb *db = server.db+(current_db % server.dbnum);
        current_db++;
        do {
            unsigned long num, slots;
            long long now, ttl_sum;
            int ttl_samples;
            iteration++;

            /* If there is nothing to expire try next DB ASAP. */
            if ((num = dictSize(db->expires)) == 0) {
                db->avg_ttl = 0;
                break;
            }
            //获取expired_keys空间内的过期键
            slots = dictSlots(db->expires);
            now = mstime();

            /* When there are less than 1% filled slots, sampling the key
             * space is expensive, so stop here waiting for better times...
             * The dictionary will be resized asap. */
            if (slots > DICT_HT_INITIAL_SIZE &&
                (num*100/slots < 1)) break;

            /* The main collection cycle. Sample random keys among keys
             * with an expire set, checking for expired ones. */
            expired = 0;
            sampled = 0;
            ttl_sum = 0;
            ttl_samples = 0;

            if (num > config_keys_per_loop)
                num = config_keys_per_loop;

            /* Here we access the low level representation of the hash table
             * for speed concerns: this makes this code coupled with dict.c,
             * but it hardly changed in ten years.
             *
             * Note that certain places of the hash table may be empty,
             * so we want also a stop condition about the number of
             * buckets that we scanned. However scanning for free buckets
             * is very fast: we are in the cache line scanning a sequential
             * array of NULL pointers, so we can scan a lot more buckets
             * than keys in the same time. */
            long max_buckets = num*20;
            long checked_buckets = 0;

            while (sampled < num && checked_buckets < max_buckets) {
                for (int table = 0; table < 2; table++) {
                    if (table == 1 && !dictIsRehashing(db->expires)) break;

                    unsigned long idx = db->expires_cursor;
                    idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]);
                    dictEntry *de = db->expires->ht_table[table][idx];
                    long long ttl;

                    /* Scan the current bucket of the current table. */
                    checked_buckets++;
                    while(de) {
                        /* Get the next entry now since this entry may get
                         * deleted. */
                        dictEntry *e = de;
                        de = de->next;

                        ttl = dictGetSignedIntegerVal(e)-now;
                        //遍历dict每个entry,如果过期就移除掉
                        if (activeExpireCycleTryExpire(db,e,now)) {
                            expired++;
                            /* Propagate the DEL command */
                            propagatePendingCommands();
                        }
                        if (ttl > 0) {
                            /* We want the average TTL of keys yet
                             * not expired. */
                            ttl_sum += ttl;
                            ttl_samples++;
                        }
                        sampled++;
                    }
                }
                db->expires_cursor++;
            }
            total_expired += expired;
            total_sampled += sampled;
        } while (sampled == 0 ||
                 (expired*100/sampled) > config_cycle_acceptable_stale);
    }

    serverAssert(server.core_propagates); /* This function should not be re-entrant */

    server.core_propagates = 0;
    server.in_nested_call--;

    elapsed = ustime()-start;
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. */
    double current_perc;
    if (total_sampled) {
        current_perc = (double)total_expired/total_sampled;
    } else
        current_perc = 0;
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);
}
//判断是否过期,如果没过期,啥也不做
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
    long long t = dictGetSignedIntegerVal(de);
    if (now > t) {
        sds key = dictGetKey(de);
        robj *keyobj = createStringObject(key,sdslen(key));
        deleteExpiredKeyAndPropagate(db,keyobj);
        decrRefCount(keyobj);
        return 1;
    } else {
        return 0;
    }
}

//删除过期键
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
    mstime_t expire_latency;
    latencyStartMonitor(expire_latency);
    //惰性删除会评估删除的代价,如果代价大就交给后台线程做,否则主线程直接删除
    if (server.lazyfree_lazy_expire)
        dbAsyncDelete(db,keyobj);
    else
        dbSyncDelete(db,keyobj); //同步删除
    latencyEndMonitor(expire_latency);
    latencyAddSampleIfNeeded("expire-del",expire_latency);
    //更新每一个watched_key的client的flag为脏位
    signalModifiedKey(NULL, db, keyobj);
    //向follower传递这个删除命令
    propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
    server.stat_expiredkeys++;
}

int dbAsyncDelete(redisDb *db, robj *key) {
    return dbGenericDelete(db, key, 1);
}
//先逻辑删除,移除掉引用,
static int dbGenericDelete(redisDb *db, robj *key, int async) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        /* Tells the module that the key has been unlinked from the database. */
        moduleNotifyKeyUnlink(key,val,db->id);
        /* We want to try to unblock any client using a blocking XREADGROUP */
        if (val->type == OBJ_STREAM)
            signalKeyAsReady(db,key,val->type);
            //惰性删除
        if (async) {
            freeObjAsync(key, val, db->id);
            dictSetVal(db->dict, de, NULL);
        }
        if (server.cluster_enabled) slotToKeyDelEntry(de, db);
        dictFreeUnlinkedEntry(db->dict,de);
        return 1;
    } else {
        return 0;
    }
}

处理过期键会迭代expired_keys字典空间下每一个entry,判断它的过期时间,如果过期就会移除掉引用关系,让这个对象变成不可到达的对象。另外,如果开启了惰性删除,则会评估删除的代价,要是代价小(空间占用情况)的话,主线程会直接释放,否则便会封装成job交给后台线程删除,如下。

代码语言:c
复制
void freeObjAsync(robj *key, robj *obj, int dbid) {
    size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
    /* Note that if the object is shared, to reclaim it now it is not
     * possible. This rarely happens, however sometimes the implementation
     * of parts of the Redis core may call incrRefCount() to protect
     * objects, and then call dbDelete(). */
    if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
        atomicIncr(lazyfree_objects,1);
        bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
    } else {
        decrRefCount(obj);
    }
}


void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
    va_list valist;
    /* Allocate memory for the job structure and all required
     * arguments */
    bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
    job->free_args.free_fn = free_fn;

    va_start(valist, arg_count);
    for (int i = 0; i < arg_count; i++) {
        job->free_args.free_args[i] = va_arg(valist, void *);
    }
    va_end(valist);
    bioSubmitJob(BIO_LAZY_FREE, job);
}
//
void bioSubmitJob(int type, bio_job *job) {
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    //唤醒惰性删除线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

后台线程主要做的是一些容忍延迟的task,包括lazy_free、aof_fsync、close等等。

代码语言:c
复制
/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */

3 处理待写事件

处理待写事件
处理待写事件
代码语言:c
复制
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        /* Since all replicas and replication backlog use global replication
         * buffer, to guarantee data accessing thread safe, we must put all
         * replicas client into io_threads_list[0] i.e. main thread handles
         * sending the output buffer of all replicas. */
        if (getClientType(c) == CLIENT_TYPE_SLAVE) {
            listAddNodeTail(io_threads_list[0],c);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    io_threads_op = IO_THREADS_OP_IDLE;

    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Update the client in the mem usage after we're done processing it in the io-threads */
        updateClientMemUsage(c);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c)) {
            installClientWriteHandler(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}

处理待写事件流程比较简单,主要是以下几步:

  1. 将等待处理的client均匀分发给每个io_thread。
  2. 处理每个客户端,将响应数据刷新到socket buffer中发送出去。
  3. 更新客户端内存使用,清空server.clients_pending_write队列。

四、apiPoll

从epoll中拉取触发事件并放入到server.clients_pending_read中。

代码语言:c
复制
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    //state是epoll具体信息
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    //获取一批触发事件,并放入到state_events中
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            //fired保存此次触发事件的连接和类型
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

fired中存放的是本次触发的事件,而连接具体的信息是在events中,需要以fd作为下标从events中拿取网络连接的具体信息。

代码语言:c
复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    //如果没有任何事件发生,则return
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
        //将触发的事件放入到队列中
        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */
            int invert = fe->mask & AE_BARRIER;
            //执行建立连接时设置的handler,client设置的是ae_handler
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

主线程会对触发的事件执行注册时的函数,rfileProc、wfileProc注册的都是ae_handler。而ae_handler实际上是根据事件类型调用了connection的conn_handler、read_handler和write_handler。而对于read_handler,我们设置的是readQueryFromClient,之前也讲到过这个函数。

代码语言:c
复制
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

     //推迟客户端读写,放入到server的clients_pending_write、clients_pending_read列表上,
     //而server每次进入事件循环时都会将列表中的客户端分发到io_threads_list
    if (postponeClientRead(c)) return;

    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
        big_arg = 1;

        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0) readlen = remaining;

        /* Master client needs expand the readlen when meet BIG_ARG(see #9100),
         * but doesn't need align to the next arg, we can read more data. */
        if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
            readlen = PROTO_IOBUF_LEN;
    }

    qblen = sdslen(c->querybuf);
    if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
        (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
        /* When reading a BIG_ARG we won't be reading more than that one arg
         * into the query buffer, so we don't need to pre-allocate more than we
         * need, so using the non-greedy growing. For an initial allocation of
         * the query buffer, we also don't wanna use the greedy growth, in order
         * to avoid collision with the RESIZE_THRESHOLD mechanism. */
        c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
    } else {
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

        /* Read as much as possible from the socket to save read(2) system calls. */
        readlen = sdsavail(c->querybuf);
    }
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            goto done;
        }
    } else if (nread == 0) {
        if (server.verbosity <= LL_VERBOSE) {
            sds info = catClientInfoString(sdsempty(), c);
            serverLog(LL_VERBOSE, "Client closed connection %s", info);
            sdsfree(info);
        }
        freeClientAsync(c);
        goto done;
    }

    sdsIncrLen(c->querybuf,nread);
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) {
        c->read_reploff += nread;
        atomicIncr(server.stat_net_repl_input_bytes, nread);
    } else {
        atomicIncr(server.stat_net_input_bytes, nread);
    }

    if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        goto done;
    }

    /* There is more data in the client input buffer, continue parsing it
     * and check if there is a full command to execute. */
    if (processInputBuffer(c) == C_ERR)
         c = NULL;

done:
    beforeNextClient(c);
}

readQueryFromClient有两个执行的时机,一个是在handleClientsWithPendingReadsUsingThreads时会执行,此时是将数据从socket buffer中读到client中。第二个时机就是apiPoll后执行触发事件的rfileProc时也执行了一次这个函数,这时并不会读取数据,而是将client推进server.clients_pending_read中,后面在handleClientsWithPendingReadsUsingThreads中再由io_thread_list读取数据。

image.png
image.png
代码语言:c
复制
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        //推迟就是将客户端放入到头部
        listAddNodeHead(server.clients_pending_read,c);
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

就是简单的将触发事件对应的client推进server.clients_pending_read。

五、小结

事件分发流程
事件分发流程

总结一下主要过程:

  1. 创建listen socket,并注册到eventloop,设置accept_handler。
  2. listen socket事件触发后就会accept client socket,并创建connection、client,然后将事件注册到eventloop中监听后续活动。
  3. 主循环通过beforeSleep Hook来事件分发。
    • 分发clients_pending_read,让io_thread_list从socket buffer拷贝数据到client buffer,并解析参数、命令,主线程自旋等待这个过程结束。
    • 主线程迭代clients_pending_read,执行每个client命令,并通过addReply将响应事件写入到clients_pending_write中。
    • 迭代expired_keys dict每个entry,判断是否过期,如果过期就释放,如果开启惰性删除就评估释放代价,代价小主线程直接释放,代价大则唤醒后台线程去释放。
    • 分发clients_pending_write给io_thread_list,将client buffer写入到socket buffer发送出去,完成响应过程。
  4. apiPoll调用epoll_wait拉取触发的事件放入到fired数组,然后根据eventfd执行每个connection的rfileProc、wfileProc,其实是ae_handler。其中读事件最终会调用readQueryFromClient-->postponeClientRead,将触发事件的client添加到clients_pending_read中用于分发。 这篇文章主要介绍这些,内容太多难以通过一篇文章讲完,以后再讲?。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景
  • 二、epoll使用案例
  • 三、redis事件注册
    • 1 数据结构
      • 2 创建listen socket
        • 3 将listen socket fd注册到eventloop中监听
          • 4 注册client读写事件
          • 三、beforeSleep
            • 1 handleClientsWithPendingReadsUsingThreads
              • 2 处理过期键
                • 3 处理待写事件
                • 四、apiPoll
                • 五、小结
                相关产品与服务
                云数据库 Redis
                腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档