上一篇介绍了Hiredis中的同步api以及回复解析api,这里紧接着介绍异步api。异步api需要与事件库(libevent、libev、ae一起工作)。
在同步api中,介绍了一个上下文结构redisContext,异步api也有一个类似的上下文结构redisAsyncContext,用于维护异步连接中的各种状态。源码如下所示:
typedef struct redisAsyncContext {
redisContext c;
struct {
void *data;
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
} ev;
redisDisconnectCallback *onDisconnect;
redisConnectCallback *onConnect;
redisCallbackList replies;
struct {
redisCallbackList invalid;
struct dict *channels;
struct dict *patterns;
} sub;
} redisAsyncContext;
redisAsyncContext在redisContext的基础上增加了一些异步属性
异步api中建立连接函数redisAsyncConnect源码如下所示:
redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
redisContext *c;
redisAsyncContext *ac;
c = redisConnectNonBlock(ip,port);
if (c == NULL)
return NULL;
ac = redisAsyncInitialize(c);
if (ac == NULL) {
redisFree(c);
return NULL;
}
__redisAsyncCopyError(ac);
return ac;
}
函数redisAsyncSetConnectCallBack函数用于设置异步上下文中的连接建立回调函数,源码如下所示:
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
if (ac->onConnect == NULL) {
ac->onConnect = fn;
_EL_ADD_WRITE(ac);
return REDIS_OK;
}
return REDIS_ERR;
}
如果之前没有设置过回调,首先会设置回调,然后调用_EL_ADD_WRITE注册可写事件(连接建立成功客户端就要向服务器发送命令,因此是一个可写事件)。如果是的是ae库,那么宏定义展开就是redisAeAddWrite函数(adapters目录中ae.h),函数源码如下所示:
static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}
可写事件回到函数是redisAeWriteEvent,该函数调用redisAsyncHandleWrite实现,源码如下所示:
void redisAsyncHandleWrite(redisAsyncContext *ac) {
...
if (!(c->flags & REDIS_CONNECTED)) {
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
if (!(c->flags & REDIS_CONNECTED))
return;
}
...
}
该函数中,如果上下文标志中还没有设置REDIS_CONNECTED标记,说明目前连接还没有建立成功,因此调用__redisAsyncHandleConnect,源码如下所示:
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (redisCheckSocketError(c) == REDIS_ERR) {
if (errno == EINPROGRESS)
return REDIS_OK;
if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
__redisAsyncDisconnect(ac);
return REDIS_ERR;
}
c->flags |= REDIS_CONNECTED;
if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
return REDIS_OK;
}
该函数中调用redisCheckSocketError判断当前tcp是否建立连接成功(调用getsockopt判断连接状态)。
类似于同步api中发送命令的函数redisCommand,异步api中发送命令的函数是redisAsyncCommand,redisAsyncCommand会调用redisvFormatCommand和redisAsyncCommand。其中redisvFormatCommand解析用户输入命令,转换成统一的字符串cmd,然后再调用redisAsyncCommand函数,将cmd发送给redis,并记录相应的回调函数,__redisAsyncCommand源码如下所示:
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
...
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
cb.fn = fn;
cb.privdata = privdata;
...
if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
c->flags |= REDIS_SUBSCRIBED;
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen);
if (pvariant)
dictReplace(ac->sub.patterns,sname,&cb);
else
dictReplace(ac->sub.channels,sname,&cb);
}
} else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
} else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
c->flags |= REDIS_MONITORING;
__redisPushCallback(&ac->replies,&cb);
} else {
if (c->flags & REDIS_SUBSCRIBED)
__redisPushCallback(&ac->sub.invalid,&cb);
else
__redisPushCallback(&ac->replies,&cb);
}
__redisAppendCommand(c,cmd,len);
_EL_ADD_WRITE(ac);
return REDIS_OK;
}
该函数中,首先设置回调结构callback(封装privdata以及fn)。
接下来会解析用户输入的命令:
上面步骤目的都是为了记录回调函数,回调函数记录完毕,就可以调用__redisAppendCommand,将cmd追加到上下文的输出缓存中(c->obuf)。
最后调用__EL_ADD_WRITE注册可写事件。如果使用ae事件库,那么宏定义展开就是redisAeAddWrite函数,该函数的回调函数是redisAeWriteEvent,主要调用redisAsyncHandleWrite实现,源码如下所示:
void redisAsyncHandleWrite(redisAsyncContext *ac) {
...
if (!(c->flags & REDIS_CONNECTED)) {
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
if (!(c->flags & REDIS_CONNECTED))
return;
}
if (redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
if (!done)
_EL_ADD_WRITE(ac);
else
_EL_DEL_WRITE(ac);
_EL_ADD_READ(ac);
}
}
如果连接没有成功建立,就重复之前提到的等待连接建立的过程。
连接成功建立之后,调用redisBufferWrite,将上下文中输出缓存的内容通过socket描述符发送出去。
成功发送之后,调用_EL_ADD_WRITE,删除可写事件,使用ae事件库,就是调用redisAeDelWrite函数删除注册的可写事件。
最后调用_EL_ADD_READ注册可读事件,使用ae事件库,就是调用redisAeAddRead函数注册可读事件,事件回调函数redisAeReadEvent,该函数主要是调用redisAsyncHandleRead,源码如下所示:
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (!(c->flags & REDIS_CONNECTED)) {
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
if (!(c->flags & REDIS_CONNECTED))
return;
}
if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
该函数中,同样也是首先处理连接未成功建立的情况,处理方式就不再重复。
连接建立成功之后,首先调用redisBufferRead,从socket中读取数据,并追加到解析器的输入缓存中,该函数在上一篇同步api中已经讲过,这里也不再重复。
读取成功之后,调用redisProcessCallbacks函数进行处理。该函数就是根据回复信息找到相应的回调结构,然后调用其中的回调函数,redisProcessCallbacks源码如下所示:
void redisProcessCallbacks(redisAsyncContext *ac) {
...
while((status = redisGetReply(c,&reply)) == REDIS_OK) {
if (reply == NULL) {
if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
&& ac->replies.head == NULL) {
__redisAsyncDisconnect(ac);
return;
}
if(c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}
break;
}
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
c->reader->fn->freeObject(reply);
__redisAsyncDisconnect(ac);
return;
}
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {
__redisRunCallback(ac,&cb,reply);
c->reader->fn->freeObject(reply);
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else {
c->reader->fn->freeObject(reply);
}
}
if (status != REDIS_OK)
__redisAsyncDisconnect(ac);
}
函数循环利用redisGetReply,把解析器中的内容组织成一个redisReply结构树(输入缓存中很可能包含多个结构树),树的根节点通过参数reply返回。
循环中,如果reply为NULL,如果当前上下文标志中设置了REDIS_DISCONNECTING,说明之前的某个命令的回调函数,调用了redisAsyncDisconnect函数设置了该标记,那么可以执行__redisAsyncDisconnect断开连接,释放并清理内存;如果上下文标记中设置了REDIS_MONITORING,表示当前处于监听模式下,将上次取出的会调结构重新追加到ac->replies中,退出循环。
如果reply为非空,那么调用redisShiftCallback,尝试从链表中ac->replies中取出第一个回调结构cb。如果回复类型为REDIS_REPLY_ERROR,那么调用redisAsyncDisconnect断开连接。如果回复类型不是REDIS_REPLY_ERROR,则当前客户端只能处于订阅模式或是监控模式,调用redisGetSubscribeCallback,取出对应的cb,如果cb不为空,就调用redisRunCallback。
调用redisAsyncDisconnect函数主动断开连接。该函数源码如下所示:
void redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_DISCONNECTING;
if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
__redisAsyncDisconnect(ac);
}
该函数一般情况下都是在回调函数中被调用。当调用该函数时,并不会立即断开连接,该函数将REDIS_DISCONNECTING标记添加到上下文的标记位中,只有当输出缓存中的所有命令都发送完毕并收到回复调用回调函数之后(REDIS_IN_CALLBACK从上下文中标记中抹掉),才会调用__redisAsyncDisconnect函数真正断开连接,源码如下所示:
static void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
__redisAsyncCopyError(ac);
if (ac->err == 0) {
assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
} else {
c->flags |= REDIS_DISCONNECTING;
}
__redisAsyncFree(ac);
}
该函数中首先调用redisAsyncCopyError,得到异步上下文中的err,如果err为0,则说明是客户端主动断开连接,这种情况下,ac->replies应该是一个空链表;否则,将上下文标志位中的添加REDIS_DISCONNECTING标记,说明这是由于错误引起的连接断开。最后调用redisAsyncFree函数,调用所有的上下文中异步函数(reply指定为NULL),最后调用断开连接的会调用函数,关闭socket套接字并释放空间。
https://github.com/redis/hiredis
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。