抱歉,你查看的文章已删除

redis网络通信模块源码分析(上)

导读

时下的业界,相对于传统的关系型数据库,以key-value思想实现的nosql内存数据库也是非常流行,而提到内存数据库,很多人第一反应就是redis。确实,redis以其高效的性能和优雅的实现成为众多内存数据库中的翘楚。前面章节介绍了单个服务器的基本结构,这个章节我们再来一个实战演习,即以redis为例来讲解实际项目的中服务器结构是怎样的。当然,本文介绍的角度与前面的章节思路不一样,前面的章节是先给结论,然后再加以论证,而本文则是假设预先不清楚redis网络通信层的结构,结合gdb调试,以探究的方式逐步搞清楚redis的网络通信模块结构。

redis源码下载与编译

redis的最新源码下载地址可以在redis官网(https://redis.io/)获得。笔者使用的是CentOS 7.0系统,因此使用wget命令将redis源码文件下载下来:

[root@localhost gdbtest]# wget http://download.redis.io/releases/redis-4.0.11.tar.gz
--2018-09-08 13:08:41--  http://download.redis.io/releases/redis-4.0.11.tar.gz
Resolving download.redis.io (download.redis.io)... 109.74.203.151
Connecting to download.redis.io (download.redis.io)|109.74.203.151|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1739656 (1.7M) [application/x-gzip]
Saving to: ‘redis-4.0.11.tar.gz’

54% [==================================================================>                                                         ] 940,876     65.6KB/s  eta 9s

解压:

[root@localhost gdbtest]# tar zxvf redis-4.0.11.tar.gz 

进入生成的redis-4.0.11目录使用makefile进行编译:

[root@localhost gdbtest]# cd redis-4.0.11
[root@localhost redis-4.0.11]# make -j 4

编译成功后,会在src目录下生成多个可执行程序,其中redis-serverredis-cli是我们需要即将调试的程序。

我们可以进入src目录,使用gdb启动redis-server这个程序:

[root@localhost src]# gdb redis-server 
Reading symbols from /root/redis-4.0.9/src/redis-server...done.
(gdb) r
Starting program: /root/redis-4.0.9/src/redis-server 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
31212:C 17 Sep 11:59:50.781 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31212:C 17 Sep 11:59:50.781 # Redis version=4.0.9, bits=64, commit=00000000, modified=0, pid=31212, just started
31212:C 17 Sep 11:59:50.781 # Warning: no config file specified, using the default config. In order to specify a config file use /root/redis-4.0.9/src/redis-server /path/to/redis.conf
31212:M 17 Sep 11:59:50.781 * Increased maximum number of open files to 10032 (it was originally set to 1024).
[New Thread 0x7ffff07ff700 (LWP 31216)]
[New Thread 0x7fffefffe700 (LWP 31217)]
[New Thread 0x7fffef7fd700 (LWP 31218)]
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 4.0.9 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 31212
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

31212:M 17 Sep 11:59:50.793 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
31212:M 17 Sep 11:59:50.793 # Server initialized
31212:M 17 Sep 11:59:50.793 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
31212:M 17 Sep 11:59:50.794 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
31212:M 17 Sep 11:59:50.794 * DB loaded from disk: 0.000 seconds
31212:M 17 Sep 11:59:50.794 * Ready to accept connections

以上是redis-server的启动成功后的画面。

我们再开一个session,再次进入redis源码所在的src目录,然后使用gdb启动redis客户端redis-cli:

[root@localhost src]# gdb redis-cli
Reading symbols from /root/redis-4.0.9/src/redis-cli...done.
(gdb) r
Starting program: /root/redis-4.0.9/src/redis-cli 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
127.0.0.1:6379> 

以上是redis-cli启动成功后的画面。

通信示例

我们本章节的目的是为了学习和研究redis的网络通信模块,我们并不关心redis其他一些内容,所以为了说明问题方便,我们使用的一个简单的通信实例,即通过redis-cli产生一个key为"hello",值为"world"的key-value数据,然后得到redis-server的响应。我们通过这样一个实例来研究redis的网络通信模块。

127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> 

探究redis-server端的网络通信模块

我们先研究redis-server端的通信模块。

倾听socket初始化工作

通过前面章节的介绍,我们知道网络通信的本质在应用层上的大致流程如下:

1.服务器端创建侦听socket

2.将侦听socket绑定到需要的ip地址和端口上(调用Socket API bind函数)

3.侦听(调用socket API listen函数)

4.无限等待客户端连接到来,调用Socket API accept函数接受客户端连接,并产生一个与该客户端对应的客户端socket。

5.处理在客户端socket上的网络数据的收发,必要时关闭该socket。

根据上面的流程,我们先来探究流程中的1、2、3这三步。由于redis-server默认对客户端的端口号是6379,我们可以使用这个信息作为依据。

全局搜索一下redis的代码,我们寻找调用了bind函数的代码,经过过滤和筛选,我们确定了位于anet.c的anetListen函数。

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    if (bind(s,sa,len) == -1) {
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    if (listen(s, backlog) == -1) {
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

用gdb的b命令在这个函数上加个断点,然后重新运行redis-server:

(gdb) b anetListen
Breakpoint 1 at 0x426cd0: file anet.c, line 440.
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-4.0.9/src/redis-server 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
31546:C 17 Sep 14:20:43.861 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31546:C 17 Sep 14:20:43.861 # Redis version=4.0.9, bits=64, commit=00000000, modified=0, pid=31546, just started
31546:C 17 Sep 14:20:43.861 # Warning: no config file specified, using the default config. In order to specify a config file use /root/redis-4.0.9/src/redis-server /path/to/redis.conf
31546:M 17 Sep 14:20:43.862 * Increased maximum number of open files to 10032 (it was originally set to 1024).

Breakpoint 1, anetListen (err=0x745bb0 <server+560> "", s=10, sa=0x75dfe0, len=28, backlog=511) at anet.c:440
440     static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {

当gdb中断在这个函数时,我们使用bt命令查看一下此时的调用堆栈:

(gdb) bt
#0  anetListen (err=0x745bb0 <server+560> "", s=10, sa=0x75dfe0, len=28, backlog=511) at anet.c:440
#1  0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)
    at anet.c:487
#2  0x000000000042792d in anetTcp6Server (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, backlog=<optimized out>)
    at anet.c:510
#3  0x000000000042b01f in listenToPort (port=6379, fds=fds@entry=0x745ae4 <server+356>, count=count@entry=0x745b24 <server+420>) at server.c:1728
#4  0x000000000042f917 in initServer () at server.c:1852
#5  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857

通过这个堆栈,结合堆栈#2的6379主线程(因为从堆栈上看,最顶层堆栈是main**函数)中进行的。

我们看下堆栈#1处的代码:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s = -1, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */

    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket, errno: %d", errno);
        goto error;
    }

error:
    if (s != -1) close(s);
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}

将堆栈切换至#1,然后输入info arg可以查看传入给这个函数的参数:

(gdb) f 1
#1  0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)
    at anet.c:487
487             if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
(gdb) info args
err = 0x745bb0 <server+560> ""
port = 6379
bindaddr = 0x0
af = 10
backlog = 511

这里使用系统API getaddrinfo来解析得到当前主机的ip地址和端口信息。这里没有选择使用gethostbyname这个API是因为gethostbyname仅能用于解析ipv4相关的主机信息,而getaddrinfo既可以用于ipv4也可以用于ipv6,这个函数的签名如下:

int getaddrinfo(const char *node, const char *service,
                       const struct addrinfo *hints,
                       struct addrinfo **res);

您可以在linux man手册上查看这个函数的具体用法。通常服务器端在调用getaddrinfo之前,将hints参数的ai_flags设置AI_PASSIVE,用于bind;主机名nodename通常会设置为NULL,返回通配地址[::]。 当然,客户端调用getaddrinfo时,hints参数的ai_flags一般不设置AI_PASSIVE,但是主机名node和服务名service(更愿意称之为端口)则应该不为空。

解析完毕协议信息后,利用得到的协议信息创建侦听socket,并开启该socket的reuseAddr选项。然后调用anetListen函数,在该函数中先bind后listen。至此,redis-server就可以在6379端口上接受客户端连接了。

接受客户端连接

同样的道理,我们要研究redis-server如何接受客户端连接,我们只要搜索socket API accept函数即可。

经定位,我们最终在anet.c文件中找到anetGenericAccept函数:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

我们用b命令在这个函数处加个断点,然后重新运行redis-server。一直到程序全部运行起来,gdb都没有触发该断点,我们新打开一个redis-cli,以模拟新客户端连接到redis-server上的行为。断点触发了,我们查看下此时的调用堆栈。

Breakpoint 2, anetGenericAccept (err=0x745bb0 <server+560> "", s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet.c:531
531     static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
(gdb) bt
#0  anetGenericAccept (err=0x745bb0 <server+560> "", s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet.c:531
#1  0x0000000000427a1d in anetTcpAccept (err=<optimized out>, s=s@entry=11, ip=ip@entry=0x7fffffffe370 "\317P\237[", ip_len=ip_len@entry=46, 
    port=port@entry=0x7fffffffe36c) at anet.c:552
#2  0x0000000000437fb1 in acceptTcpHandler (el=<optimized out>, fd=11, privdata=<optimized out>, mask=<optimized out>) at networking.c:689
#3  0x00000000004267f0 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff083a0a0, flags=flags@entry=11) at ae.c:440
#4  0x0000000000426adb in aeMain (eventLoop=0x7ffff083a0a0) at ae.c:498
#5  0x00000000004238ef in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3894

分析这个调用堆栈,我们梳理一下这个调用流程。在main函数的initServer函数中创建侦听socket、绑定地址然后开启侦听,接着调用aeMain函数启动一个循环不断地处理“事件”。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

循环的退出条件是eventLoop->stop为1。事件处理的代码如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

该段代码先通过flag参数检查是否有件事需要处理。如果有定时器事件(AE_TIME_EVENTS标志),则寻找最近要到期的定时器。

/* Search the first timer to fire.
 * This operation is useful to know how many time the select can be
 * put in sleep without to delay any event.
 * If there are no timers NULL is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while(te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

这段代码有详细的注释,也非常好理解。代码作者注释告诉我们,由于这里的定时器集合是无序的,所以需要遍历一下这个链表,算法复杂度是O(n)。同时,注释中也“暗示”了我们将来redis在这块的优化方向,即把这个链表按到期时间从小到大排下序,这样链表的头部就是我们要的最近时间点的定时器对象,算法复杂度是O(1)。或者使用redis中的skiplist,算法复杂度是O(log(N))。

接着获取当前系统时间(aeGetTime(&now_sec, &now_ms);)将最早要到期的定时器减去当前系统时间获得一个间隔。这个时间间隔作为numevents = aeApiPoll(eventLoop, tvp);调用的参数,aeApiPoll()在linux平台上使用的epoll技术,redis在这个IO复用技术上在不同的操作系统平台上使用不同的系统函数,在Windows系统上使用select,在Mac系统上使用kqueue。这里我们重点看下linux平台下的实现:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

epoll_wait这个函数的签名如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

最后一个参数timeout的设置非常有讲究,如果传入进来的tvp是NULL,根据上文的分析,说明没有定时器事件,则将等待时间设置为-1,这会让epoll_wait无限期的挂起来,直到有事件时才会被唤醒。挂起的好处就是不浪费cpu时间片。反之,将timeout设置成最近的定时器事件间隔,将epoll_wait的等待时间设置为最近的定时器事件来临的时间间隔,可以及时唤醒epoll_wait,这样程序流可以尽快处理这个到期的定时器事件(下文会介绍)。

对于epoll_wait这种系统调用,所有的fd(对于网络通信,也叫socket)信息包括侦听fd和普通客户端fd都记录在事件循环对象aeEventLoop的apidata字段中,当某个fd上有事件触发时,从apidata中找到该fd,并把事件类型(mask字段)一起记录到aeEventLoop的fired字段中去。我们先把这个流程介绍完,再介绍epoll_wait函数中使用的epfd是在何时何地创建的,侦听fd、客户端fd是如何挂载到epfd上去的。

在得到了有事件的fd以后,接下来就要处理这些事件了。在主循环aeProcessEvents中从aeEventLoop对象的fired数组中取出上一步记录的fd,然后根据事件类型(读事件和写事件)分别进行处理。

for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }

读事件字段rfileProc和写事件字段wfileProc都是函数指针,在程序早期设置好,这里直接调用就可以了。

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

epfd的创建

我们通过搜索关键字epoll_create在ae_poll.c文件中找到epfd的创建函数aeApiCreate。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

使用gdb的b命令在这个函数上加个断点,然后使用run命令重新运行一下redis-server,触发断点,使用bt命令查看此时的调用堆栈。啊哈,发现epfd也是在上文介绍的initServer函数中创建的。

(gdb) bt
#0  aeCreateEventLoop (setsize=10128) at ae.c:79
#1  0x000000000042f542 in initServer () at server.c:1841
#2  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857

在aeCreateEventLoop中不仅创建了epfd,也创建了整个事件循环需要的aeEventLoop对象,并把这个对象记录在redis的一个全局变量的el字段中。这个全局变量叫server,这是一个结构体类型。其定义如下:

//位于server.c文件中
struct redisServer server; /* Server global state */


//位于server.h文件中
struct redisServer {
    /* General */
    //省略部分字段...
    aeEventLoop *el;
    unsigned int lruclock;      /* Clock for LRU eviction */
    //太长了,省略部分字段...
}

侦听fd与客户端fd是如何挂载到epfd上去的

同样的方式,要把一个fd挂载到epfd上去,需要调用系统API epoll_ctl,全部搜索一下这个函数名。在文件ae_epoll.c中我们找到aeApiAddEvent函数:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

当把一个fd绑定到epfd上去的时候,先从eventLoop(aeEventLoop类型)中寻找是否存在已关注的件类型,如果已经有了,说明使用epoll_ctl是更改已绑定的fd事件类型(EPOLL_CTL_MOD),否则就是添加fd到epfd上。

我们在aeApiAddEvent加个断点,再重启下redis-server。触发断点后的调用堆栈如下:

#0  aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50 <acceptTcpHandler>, clientData=clientData@entry=0x0) at ae.c:145
#1  0x000000000042f83b in initServer () at server.c:1927
#2  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857

同样在initServer函数中。我们结合上文分析的侦听fd的创建过程,去掉无关代码,抽出这个函数的主脉络得到如下伪代码:

void initServer(void) {

    //记录程序进程ID   
    server.pid = getpid();

    //创建程序的aeEventLoop对象和epfd对象
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    //创建侦听fd
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)

    //将侦听fd设置为非阻塞的
    anetNonBlock(NULL,server.sofd);

    //创建redis的定时器,用于执行定时任务cron
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR

    //将侦听fd绑定到epfd上去
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
     aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

    //创建一个管道,用于在需要时去唤醒epoll_wait挂起的整个EventLoop
    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
    aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR)
}

注意:这里所说的“主脉络”是指我们这里关心的网络通信的主脉络,不代表这个函数中其他代码就不是主要的。

我们如何验证这个断点处挂载到epfd上的fd就是侦听fd呢?这个很容易,创建侦听fd时,我们用gdb记录下这个fd的值。例如,笔者的电脑某次运行时,侦听fd的值是15。如下图(调试工具用的是cgdb):

然后在运行程序至绑定fd的地方,确认一下绑定到epfd上的fd值:

这里的fd值也是15,说明绑定的fd是侦听fd。当然在绑定侦听fd时,同时也指定了只关注可读事件,并设置时间回调函数为acceptTcpHandler。对于侦听fd,我们一般只要关注可读事件就可以了,一般当触发可读事件,说明有新的连接到来。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

acceptTcpHandler函数定义如下(位于文件networking.c中):


void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}

anetTcpAccept函数中调用就是我们上文中说的anetGenericAccept函数了。

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;

    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

至此,这段流程总算连起来了。我们在acceptTcpHandler上加个断点,然后重新运行一下redis-server,再开个redis-cli去连接redis-server。看看是否能触发该断点,如果能触发该断点,说明我们的分析时正确的。

经验证,确实触发了该断点。

在acceptTcpHandler中成功接受新连接后,产生客户端fd,然后调用acceptCommonHandler函数,在该函数中调用createClient函数,在createClient函数中先将客户端fd设置成非阻塞的,然后将该fd关联到epfd上去,同时记录到整个程序的aeEventLoop对象上。注意,这里客户端fd绑定到epfd上时也只关注可读事件。我们将无关的代码去掉,然后抽出我们关注的部分,整理后如下(位于networking.c文件中):

client *createClient(int fd) {
    //将客户端fd设置成非阻塞的
    anetNonBlock(NULL,fd);
    //启用tcp NoDelay选项
    anetEnableTcpNoDelay(NULL,fd);
    //根据配置,决定是否启动tcpkeepalive选项
    if (server.tcpkeepalive)
        anetKeepAlive(NULL,fd,server.tcpkeepalive);
    //将客户端fd绑定到epfd,同时记录到aeEventLoop上,关注的事件为AE_READABLE,回调函数为
    //readQueryFromClient
    aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR

    return c;
}

本文分享自微信公众号 - 高性能服务器开发(easyserverdev)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

高性能服务器开发

195 篇文章142 人订阅

相关文章

来自专栏Kirito的技术分享

深入理解RPC之传输篇

RPC 被称为“远程过程调用”,表明了一个方法调用会跨越网络,跨越进程,所以传输层是不可或缺的。一说到网络传输,一堆名词就蹦了出来:TCP、UDP、HTTP,同...

1.1K70
来自专栏日常分享

JavaWeb 基于Session的用户登陆注销实现

  通过Session来存储用户的部分登陆信息来验证用户是否在线,这应该时最容易实现的一种Web端方案,本文以SSM(Spring、SpringMVC、myBa...

92810
来自专栏Java架构沉思录

你真的懂Mybatis缓存机制吗

Mybatis的一级缓存是指Session缓存。一级缓存的作用域默认是一个SqlSession。Mybatis默认开启一级缓存。 也就是在同一个SqlSessi...

1.9K50
来自专栏祝威廉

Kafka Zero-Copy 使用分析

Kafka 我个人感觉是性能优化的典范。而且使用Scala开发,代码写的也很漂亮的。重点我觉得有四个

32520
来自专栏YG小书屋

ElasticSearch 5.6源码解析HTTP/TCP请求

59630
来自专栏zhisheng

渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(下)

上篇文章写完了 ES 流程启动的一部分,main 方法都入口,以及创建 Elasticsearch 运行的必须环境以及相关配置,接着就是创建该环境的节点了。

18730
来自专栏码农阿宇

国内开源社区巨作AspectCore-Framework入门

在软件业,AOP为Aspect Oriented Programming的缩写,意为:面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技...

31010
来自专栏Java后端技术

Java发送邮件初窥

  最近朋友的公司有用到这个功能,之前对这一块也不是很熟悉,就和他一起解决出现的异常的同时,也初窥一下使用Apache Common Email组件进行邮件发送...

17520
来自专栏JadePeng的技术博客

XNginx - nginx 集群可视化管理工具

之前团队的nginx管理,都是运维同学每次去修改配置文件,然后重启,非常不方便,一直想找一个可以方便管理nginx集群的工具,翻遍web,未寻到可用之物,于是自...

2.4K40
来自专栏Felix的技术分享

《一个操作系统的实现》笔记(5)--内核雏形

23040

扫码关注云+社区

领取腾讯云代金券