Hiredis源码阅读(二)

Hiredis源码解析

上一篇介绍了Hiredis中的同步api以及回复解析api,这里紧接着介绍异步api。异步api需要与事件库(libevent、libev、ae一起工作)。

1、异步上下文

在同步api中,介绍了一个上下文结构redisContext,异步api也有一个类似的上下文结构redisAsyncContext,用于维护异步连接中的各种状态。源码如下所示:

typedef struct redisAsyncContext {
    redisContext c;
    struct {
        void *data;
        void (*addRead)(void *privdata);
        void (*delRead)(void *privdata);
        void (*addWrite)(void *privdata);
        void (*delWrite)(void *privdata);
        void (*cleanup)(void *privdata);
    } ev;
    redisDisconnectCallback *onDisconnect;
    redisConnectCallback *onConnect;
    redisCallbackList replies;
    struct {
        redisCallbackList invalid;
        struct dict *channels;
        struct dict *patterns;
    } sub;
} redisAsyncContext;

redisAsyncContext在redisContext的基础上增加了一些异步属性

  • ev:当Hiredis异步api与事件库(libev、libevent、ae)一起工作,用于注册和删除读写事件、清理相关的函数
  • onDisconnect:连接断开会调用的函数
  • onConnect:连接建立成功或失败都会调用
  • replies: 一个redisCallbackList结构,由结构会调结构redisCallback组成的单链表(当向redis发送普通命令时,会依次将该命令对应的会调结构追加到链表中,当redis回复命令时,会依次调用链表中每个redisCallback结构中的回调函数)

2、建立连接

异步api中建立连接函数redisAsyncConnect源码如下所示:

redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
    redisContext *c;
    redisAsyncContext *ac;
    c = redisConnectNonBlock(ip,port);
    if (c == NULL)
        return NULL;
    ac = redisAsyncInitialize(c);
    if (ac == NULL) {
        redisFree(c);
        return NULL;
    }
    __redisAsyncCopyError(ac);
    return ac;
}
  • 根据ip和port,调用redisConnectNonBlock函数向Redis服务器发起非阻塞的建立连接请求。这里redisConnectNonBlock中会调用redisContextInit初始化常规的redisContext上下文结构,再设置上下文标志为非阻塞,最后调用redisContextConnectTcp(如果connect没有立即成功,会不断轮训直到成功或是错误)。
  • 调用redisAsyncInitialize函数创建异步上下文结构redisAsyncContext。

函数redisAsyncSetConnectCallBack函数用于设置异步上下文中的连接建立回调函数,源码如下所示:

int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
    if (ac->onConnect == NULL) {
        ac->onConnect = fn;
        _EL_ADD_WRITE(ac);
        return REDIS_OK;
    }
    return REDIS_ERR;
}

如果之前没有设置过回调,首先会设置回调,然后调用_EL_ADD_WRITE注册可写事件(连接建立成功客户端就要向服务器发送命令,因此是一个可写事件)。如果是的是ae库,那么宏定义展开就是redisAeAddWrite函数(adapters目录中ae.h),函数源码如下所示:

static void redisAeAddWrite(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (!e->writing) {
        e->writing = 1;
        aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
    }
}

可写事件回到函数是redisAeWriteEvent,该函数调用redisAsyncHandleWrite实现,源码如下所示:

void redisAsyncHandleWrite(redisAsyncContext *ac) {
    ...
    if (!(c->flags & REDIS_CONNECTED)) {
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
            return;
        if (!(c->flags & REDIS_CONNECTED))
            return;
    }
    ...
}

该函数中,如果上下文标志中还没有设置REDIS_CONNECTED标记,说明目前连接还没有建立成功,因此调用__redisAsyncHandleConnect,源码如下所示:

static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    if (redisCheckSocketError(c) == REDIS_ERR) {
        if (errno == EINPROGRESS)
            return REDIS_OK;

        if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
        __redisAsyncDisconnect(ac);
        return REDIS_ERR;
    }
    c->flags |= REDIS_CONNECTED;
    if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
    return REDIS_OK;
}

该函数中调用redisCheckSocketError判断当前tcp是否建立连接成功(调用getsockopt判断连接状态)。

  • 如果返回REDIS_ERR并且errno为EINPROGRESS,这种情况表示tcp还在建立中,直接返回REDIS_OK,等待下次处理;其他情况都以失败处理,然后调用回调函数,最后调用__redisAsyncDisconnect断开连接并做清理工作。
  • 如果返回REDIS_OK,那么增加一个REDIS_CONNECTED标记,调用回调函数。

3、发送命令&解析回复

类似于同步api中发送命令的函数redisCommand,异步api中发送命令的函数是redisAsyncCommand,redisAsyncCommand会调用redisvFormatCommand和redisAsyncCommand。其中redisvFormatCommand解析用户输入命令,转换成统一的字符串cmd,然后再调用redisAsyncCommand函数,将cmd发送给redis,并记录相应的回调函数,__redisAsyncCommand源码如下所示:

static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
    ...
    if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
    cb.fn = fn;
    cb.privdata = privdata;
    ...
    if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
        c->flags |= REDIS_SUBSCRIBED;
        while ((p = nextArgument(p,&astr,&alen)) != NULL) {
            sname = sdsnewlen(astr,alen);
            if (pvariant)
                dictReplace(ac->sub.patterns,sname,&cb);
            else
                dictReplace(ac->sub.channels,sname,&cb);
        }
    } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
        if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
     } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
         c->flags |= REDIS_MONITORING;
         __redisPushCallback(&ac->replies,&cb);
    } else {
        if (c->flags & REDIS_SUBSCRIBED)
            __redisPushCallback(&ac->sub.invalid,&cb);
        else
            __redisPushCallback(&ac->replies,&cb);
    }
    __redisAppendCommand(c,cmd,len);
    _EL_ADD_WRITE(ac);
    return REDIS_OK;
}

该函数中,首先设置回调结构callback(封装privdata以及fn)。

接下来会解析用户输入的命令:

  • 如果用户输入命令是"subscribe"或者"psubscribe",将REDIS_SUBSCRIBED标记添加到上下文中,表示当前客户端进入订阅模式;然后循环解析后续相关的参数,把频道名以及匹配模式作为key,然后回调函数cb作为value,加入到异步上下文的字典中。
  • 如果用户输入命令是"unsubscribe",这种情况不需要记录回调函数。
  • 如果用户输入命令是"monitor",将REDIS_MONITORING标记增加到上下文中,表示客户端进入monitor模式,然后调用__redisPushCallBack,将回调结构cb追加到上下文的回调链表ac->replies中。
  • 其他情况,将回调追加到ac->replies中。

上面步骤目的都是为了记录回调函数,回调函数记录完毕,就可以调用__redisAppendCommand,将cmd追加到上下文的输出缓存中(c->obuf)。

最后调用__EL_ADD_WRITE注册可写事件。如果使用ae事件库,那么宏定义展开就是redisAeAddWrite函数,该函数的回调函数是redisAeWriteEvent,主要调用redisAsyncHandleWrite实现,源码如下所示:

void redisAsyncHandleWrite(redisAsyncContext *ac) {
    ...
    if (!(c->flags & REDIS_CONNECTED)) {
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
            return;
        if (!(c->flags & REDIS_CONNECTED))
            return;
    }
    if (redisBufferWrite(c,&done) == REDIS_ERR) {
        __redisAsyncDisconnect(ac);
    } else {
        if (!done)
            _EL_ADD_WRITE(ac);
        else
            _EL_DEL_WRITE(ac);
        _EL_ADD_READ(ac);
    }
}

如果连接没有成功建立,就重复之前提到的等待连接建立的过程。

连接成功建立之后,调用redisBufferWrite,将上下文中输出缓存的内容通过socket描述符发送出去。

成功发送之后,调用_EL_ADD_WRITE,删除可写事件,使用ae事件库,就是调用redisAeDelWrite函数删除注册的可写事件。

最后调用_EL_ADD_READ注册可读事件,使用ae事件库,就是调用redisAeAddRead函数注册可读事件,事件回调函数redisAeReadEvent,该函数主要是调用redisAsyncHandleRead,源码如下所示:

void redisAsyncHandleRead(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    if (!(c->flags & REDIS_CONNECTED)) {
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
            return;
        if (!(c->flags & REDIS_CONNECTED))
            return;
    }

    if (redisBufferRead(c) == REDIS_ERR) {
        __redisAsyncDisconnect(ac);
    } else {
        _EL_ADD_READ(ac);
        redisProcessCallbacks(ac);
    }
}

该函数中,同样也是首先处理连接未成功建立的情况,处理方式就不再重复。

连接建立成功之后,首先调用redisBufferRead,从socket中读取数据,并追加到解析器的输入缓存中,该函数在上一篇同步api中已经讲过,这里也不再重复。

读取成功之后,调用redisProcessCallbacks函数进行处理。该函数就是根据回复信息找到相应的回调结构,然后调用其中的回调函数,redisProcessCallbacks源码如下所示:

void redisProcessCallbacks(redisAsyncContext *ac) {
    ...
    while((status = redisGetReply(c,&reply)) == REDIS_OK) {
        if (reply == NULL) {
            if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
                && ac->replies.head == NULL) {
                __redisAsyncDisconnect(ac);
                return;
            }
            if(c->flags & REDIS_MONITORING) {
                __redisPushCallback(&ac->replies,&cb);
            }
            break;
        }
        if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                c->err = REDIS_ERR_OTHER;
                snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                c->reader->fn->freeObject(reply);
                __redisAsyncDisconnect(ac);
                return;
            }
            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
            if(c->flags & REDIS_SUBSCRIBED)
                __redisGetSubscribeCallback(ac,reply,&cb);
        }

        if (cb.fn != NULL) {
            __redisRunCallback(ac,&cb,reply);
            c->reader->fn->freeObject(reply);
            if (c->flags & REDIS_FREEING) {
                __redisAsyncFree(ac);
                return;
            }
        } else {
            c->reader->fn->freeObject(reply);
        }
    }
    if (status != REDIS_OK)
        __redisAsyncDisconnect(ac);
}

函数循环利用redisGetReply,把解析器中的内容组织成一个redisReply结构树(输入缓存中很可能包含多个结构树),树的根节点通过参数reply返回。

循环中,如果reply为NULL,如果当前上下文标志中设置了REDIS_DISCONNECTING,说明之前的某个命令的回调函数,调用了redisAsyncDisconnect函数设置了该标记,那么可以执行__redisAsyncDisconnect断开连接,释放并清理内存;如果上下文标记中设置了REDIS_MONITORING,表示当前处于监听模式下,将上次取出的会调结构重新追加到ac->replies中,退出循环。

如果reply为非空,那么调用redisShiftCallback,尝试从链表中ac->replies中取出第一个回调结构cb。如果回复类型为REDIS_REPLY_ERROR,那么调用redisAsyncDisconnect断开连接。如果回复类型不是REDIS_REPLY_ERROR,则当前客户端只能处于订阅模式或是监控模式,调用redisGetSubscribeCallback,取出对应的cb,如果cb不为空,就调用redisRunCallback。

3、断开连接

调用redisAsyncDisconnect函数主动断开连接。该函数源码如下所示:

void redisAsyncDisconnect(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    c->flags |= REDIS_DISCONNECTING;
    if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
        __redisAsyncDisconnect(ac);
}

该函数一般情况下都是在回调函数中被调用。当调用该函数时,并不会立即断开连接,该函数将REDIS_DISCONNECTING标记添加到上下文的标记位中,只有当输出缓存中的所有命令都发送完毕并收到回复调用回调函数之后(REDIS_IN_CALLBACK从上下文中标记中抹掉),才会调用__redisAsyncDisconnect函数真正断开连接,源码如下所示:

static void __redisAsyncDisconnect(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    __redisAsyncCopyError(ac);

    if (ac->err == 0) {
        assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
    } else {
        c->flags |= REDIS_DISCONNECTING;
    }
    __redisAsyncFree(ac);
}

该函数中首先调用redisAsyncCopyError,得到异步上下文中的err,如果err为0,则说明是客户端主动断开连接,这种情况下,ac->replies应该是一个空链表;否则,将上下文标志位中的添加REDIS_DISCONNECTING标记,说明这是由于错误引起的连接断开。最后调用redisAsyncFree函数,调用所有的上下文中异步函数(reply指定为NULL),最后调用断开连接的会调用函数,关闭socket套接字并释放空间。

参考

https://github.com/redis/hiredis

http://blog.csdn.net/l1902090/article/details/38583663

http://blog.sina.com.cn/s/blog_bd7449900101jhjw.html

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JackeyGao的博客

Golang bongo 简单示例

16120
来自专栏魂祭心

原 WCF学习之旅----基础篇之Ente

27860
来自专栏施炯的IoT开发专栏

windows mobile窗口之间传递数据的方法

    在windows mobile上设计UI的时候,经常会碰到多个窗口的情况。有时候,我们需要将一个窗口中的用户输入信息反应到另一个窗口中去,这就涉及到窗口...

20690
来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(30)-本地化(多语言)

我们的系统有时要扩展到其他国家,或者地区,需要更多的语言环境,微软提供了一些解决方案,原始我们是用js来控制的,现在不需要了。 我们只要创建简单的资源文件,通过...

34870
来自专栏菩提树下的杨过

mongodb 速成笔记

以下环境为mac osx + jdk 1.8 + mongodb v3.2.3 一、安装 brew安装方式是mac下最简单的方式 brew update bre...

22850
来自专栏程序你好

ASP.NET Core RESTful Web服务开发教程

在本文中,我将逐步解释如何在ASP.NET Core中开发基于RESTful的Web服务应用程序。ASP.NET Core是微软最新发布的技术,比之前的WCF和...

71360
来自专栏木宛城主

ASP.NET那点不为人知的事(一)

我们上网时,在浏览器地址输入网址,按下回车,一张网页就呈现在我们眼前。这究竟发生了什么?对于一名优秀的Programmer来说,我想有必要一下熟悉浏览器---...

40280
来自专栏Phoenix的Android之旅

JSONObject 和 JsonObject 的区别

做Java开发经常要用Json来做数据的格式化解析,虽然在Android平台上我们习惯的使用 JSONObject, 但不知道你有没有这样的疑问,我们在impo...

25310
来自专栏大内老A

在ASP.NET MVC中通过URL路由实现对多语言的支持

对于一个需要支持多语言的Web应用,一个很常见的使用方式就是通过请求地址来控制界面呈现所基于的语言文化,比如我们在表示请求地址的URL中将上语言文化代码(比如e...

23260
来自专栏刘望舒

Android网络编程(二)HttpClient与HttpURLConnection

相关文章 Android网络编程(一)HTTP协议原理 前言 上一篇我们了解了HTTP协议原理,这一篇我们来讲讲Apache的HttpClient和Java的...

21470

扫码关注云+社区

领取腾讯云代金券