Hiredis库主要包含三类API:同步api、异步api以及回复解析api。首先介绍一下同步api以及回复解析api。
函数原型:
redisContext *redisConnect(const char *ip, int port);
redisConnect函数用来创建一个上下文结构redisContext,并向reids服务器发起连接请求。源码如下所示:
redisContext *redisConnect(const char *ip, int port) {
redisContext *c;
c = redisContextInit();
if (c == NULL)
return NULL;
c->flags |= REDIS_BLOCK;
redisContextConnectTcp(c,ip,port,NULL);
return c;
}
其中redisConext用来保存与redis服务器连接状态相关信息、输出缓冲区以及回复解析器。结构如下所示:
typedef struct redisContext {
int err;
char errstr[128];
int fd;
int flags;
char *obuf;
redisReader *reader;
...
} redisContext;
其中fd表示与redis服务器建立连接的socket描述符;而flag表示客户端标志位,表示客户端当前的状态;obuf用来保存输出缓存,用户调用reidsCommand向redis发送命令时,命令字符串首先会被追加到obuf中;reader是一个回复解析器,后续会介绍。
函数原型:
void *redisCommand(redisContext *c, const char *format, ...);
redisCommand函数返回NULL表示有错误发生,可以通过检查redisContext中的err得到错误类型;如果执行完成,则返回值是一个redisReply指针,包含了Redis的恢复信息。
redisCommand主要通过redisvCommand实现,而redisvCommand主要是通过redisvAppendCommand和__redisBlockForReply两个实现。
redisvAppendCommand源码如下所示:
int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
...
len = redisvFormatCommand(&cmd,format,ap);
if (len == -1) {
__redisSetError(c,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
} else if (len == -2) {
__redisSetError(c,REDIS_ERR_OTHER,"Invalid format string");
return REDIS_ERR;
}
if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
free(cmd);
return REDIS_ERR;
}
...
}
redisvAppendCommand函数作用是解析用户的输入,并将用户输入的命令字符串转换成redis统一的格式,暂存到redisContext.obuf中。
__redisBlockForReply源码如下所示:
void *__redisBlockForReply(redisContext *c) {
void *reply;
if (c->flags & REDIS_BLOCK) {
if (redisGetReply(c,&reply) != REDIS_OK)
return NULL;
return reply;
}
return NULL;
}
int redisGetReply(redisContext *c, void **reply) {
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
if (aux == NULL && c->flags & REDIS_BLOCK) {
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
redisGetReply中,首先是循环调用redisBufferWrite,将输出c->obuf中的所有内容发送给redis,然后循环调用redisBufferRead,读取redis的回复,调用redisGetReplyFromReader对回复信息进行解析。
redisBufferRead函数主要是从socket读取数据到buf中,然后通过函数redisReaderFeed,将bug内容追加到解析器的输入缓存中。
解析器结构redisReader,源码如下所示:
typedef struct redisReader {
char *buf;
size_t pos;
size_t len;
size_t maxbuf;
...
} redisReader;
buf就是输入缓存,redisReaderFeed函数把读取到的redis恢复信息都暂存于此;len表示当前缓存的容量;pos表示当前缓存的读取索引(每次读取输入缓存时,都从reader->buf + reader->pos处开始读取,读取数据之后,会增加pos的值);maxbuf表示输入缓存最大的允许闲置空间,当buf的空闲空间大于maxbuf时,就会buf,重新申请空间(默认值是16k)。
这里redisReaderFeed就是从socket中读取redis回复信息,追加到解析器缓存中。
上述redisGetReply函数中,将redis回复信息追加到解析器输入缓存后,就会调用redisGetReplyFromReader对解析器的输入缓存进行信息解析,最终以redisReply结构呈现。源码如下所示:
typedef struct redisReply {
int type;
long long integer;
int len;
char *str;
size_t elements;
struct redisReply **element;
} redisReply;
其中type表示redis回复信息的类型,其中基本类型主要有下面几种:
调用redisReaderGetReply解析之后,最终会形成redisReply结构树,非叶子节点只能是REDIS_REPLY_ARRAY类型,叶子节点只能是上述提到的基本类型。
例如,redis回复信息是*3\r\n*3\r\n:11\r\n:12\r\n:13\r\n*3\r\n:21\r\n:22\r\n:23\r\n:31\r\n
,那么最终形成的树如下所示:
使用redisReadTask任务结构来解析回复信息,构建每个redisReply结构节点,填充到树中合适的位置。源码如下所示:
typedef struct redisReader {
...
redisReadTask rstack[9];
int ridx;
void *reply;
redisReplyObjectFunctions *fn;
void *privdata;
} redisReader;
redisReadTask结构数组rstak大小是9;其中rtask0表示redisReply结构树中的根节点;ridx表示当前处理第几层节点;fn包含了用于生成各种类型redisReply结构的函数;reply指向redisReply结构树中的根节点。
redisReadTask结构如下所示:
typedef struct redisReadTask {
int type;
int elements;
int idx;
void *obj;
struct redisReadTask *parent;
void *privdata;
} redisReadTask;
这个地方有点绕
redisReaderGetReply源码如下所示:
int redisReaderGetReply(redisReader *r, void **reply) {
if (reply != NULL)
*reply = NULL;
if (r->err)
return REDIS_ERR;
if (r->len == 0)
return REDIS_OK;
// 初始化操作
while (r->ridx >= 0)
if (processItem(r) != REDIS_OK)
break;
if (r->err)
return REDIS_ERR;
if (r->pos >= 1024) {
sdsrange(r->buf,r->pos,-1);
r->pos = 0;
r->len = sdslen(r->buf);
}
if (r->ridx == -1) {
if (reply != NULL)
*reply = r->reply;
r->reply = NULL;
}
return REDIS_OK;
}
回复解析主要步骤:
processItem函数首先得到当前构建节点的结构redisReadTask *cur = &(r->rstack[r->ridx])
,然后从输入缓存中读取首个字符,用来判断回复信息的类型,保存到cur->type中。
根据得到的回复类型信息,调用不同的函数处理不同的类型。这里重点看一下处理数组类型的processMultiBulkItem的实现逻辑:
static int processMultiBulkItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
...
if (r->ridx == 8)
return REDIS_ERR;
if ((p = readLine(r,NULL)) != NULL) {
elements = readLongLong(p);
root = (r->ridx == 0);
if (elements == -1) {
if (r->fn && r->fn->createNil)
obj = r->fn->createNil(cur);
else
obj = (void*)REDIS_REPLY_NIL;
if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}
moveToNextTask(r);
} else {
if (r->fn && r->fn->createArray)
obj = r->fn->createArray(cur,elements);
else
obj = (void*)REDIS_REPLY_ARRAY;
if (obj == NULL)
return REDIS_ERR;
if (elements > 0) {
cur->elements = elements;
cur->obj = obj;
r->ridx++;
r->rstack[r->ridx].type = -1;
r->rstack[r->ridx].elements = -1;
r->rstack[r->ridx].idx = 0;
r->rstack[r->ridx].obj = NULL;
r->rstack[r->ridx].parent = cur;
r->rstack[r->ridx].privdata = r->privdata;
} else
moveToNextTask(r);
}
if (root) r->reply = obj;
return REDIS_OK;
}
return REDIS_ERR;
}
首先得到当前构建节点的redisReadTask结构,调用readLine函数,解析出当前节点中包含的元素个数elements。
如果elements正确解析,调用r->fn->createArray创建一个数组类型的redisReply结构节点,将obj以及elements记录到cur中。
创建数组类型redisReply结构函数createArrayObject如下所示:
static void *createArrayObject(const redisReadTask *task, int elements) {
...
r = createReplyObject(REDIS_REPLY_ARRAY);
r->elements = elements;
...
if (task->parent) {
parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY);
parent->element[task->idx] = r;
}
return r;
}
如果task->parent不为NULL,说明当前新建的redisReply结构节点有父节点,根据当前task得到该父节点redisReply结构parent,然后将当前节点保存到父节点element数组中的task->idx索引处。
数组类型的redisReply结构节点创建完成之后, 接下来就是构建各个子节点。首先就是将r->ridx加1(ridx为0是根节点),同时初始化r->rtaskr->ridx结构,其中r->rstackr->ridx.idx为0表示接下来首先构建第一个子节点。
如果elements等于0,调用moveToNextTask,为下一个要创建的节点找到合适的位置。源码如下所示:
static void moveToNextTask(redisReader *r) {
redisReadTask *cur, *prv;
while (r->ridx >= 0) {
if (r->ridx == 0) {
r->ridx--;
return;
}
cur = &(r->rstack[r->ridx]);
prv = &(r->rstack[r->ridx-1]);
assert(prv->type == REDIS_REPLY_ARRAY);
if (cur->idx == prv->elements-1) {
r->ridx--;
} else {
assert(cur->idx < prv->elements);
cur->type = -1;
cur->elements = -1;
cur->idx++;
return;
}
}
}
其中cur和prv分别表示当前正处理节点的redisReadTask结构以及父节点的redisReadTask结构。如果cur->idx小于prv->elements,那么接下来,cur结构就要开始构建当前节点的下一个兄弟节点,此时cur->idx需要加1;如果cur->idx等于prv->elements的话,说明当前节点,已经是父节点最后一个孩子节点了,那么接下来,就要开始构建当前节点的叔叔节点了,因此r->ridx--,表示上移一层,从处理父节点的rediReadTask结构开始,继续进行判断;如果当前处理的节点是根节点,即r->ridx=0,直接把r->ridx置为-1之后直接返回。
上面就是回复解析api主要的工作流程,这里redisReply结构树以及redisReadTask结构作用比较晦涩难懂(记住redisReply是最终的树结构,而redisReadTask只是用来辅助构建树结构)。
示例程序可直接参考hiredis包中的example.c,本地启动一个redis-server测试即可。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "hiredis.h"
int main(int argc, char **argv) {
unsigned int j;
redisContext *c;
redisReply *reply;
const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";
int port = (argc > 2) ? atoi(argv[2]) : 6379;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
c = redisConnectWithTimeout(hostname, port, timeout);
if (c == NULL || c->err) {
if (c) {
printf("Connection error: %s\n", c->errstr);
redisFree(c);
} else {
printf("Connection error: can't allocate redis context\n");
}
exit(1);
}
/* PING server */
reply = static_cast<redisReply *>(redisCommand(c,"PING"));
printf("PING: %s\n", reply->str);
freeReplyObject(reply);
/* Set a key */
reply = static_cast<redisReply *>(redisCommand(c,"SET %s %s", "foo",
"hello world"));
printf("SET: %s\n", reply->str);
freeReplyObject(reply);
...
/* Disconnects and frees the context */
redisFree(c);
return 0;
}
https://github.com/redis/hiredis
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。