Hiredis源码阅读(一)

Hiredis源码解析

Hiredis库主要包含三类API:同步api、异步api以及回复解析api。首先介绍一下同步api以及回复解析api。

1、同步api

1.1、建立tcp连接

函数原型:

redisContext *redisConnect(const char *ip, int port);

redisConnect函数用来创建一个上下文结构redisContext,并向reids服务器发起连接请求。源码如下所示:

redisContext *redisConnect(const char *ip, int port) {
    redisContext *c;
    c = redisContextInit();
    if (c == NULL)
        return NULL;
    c->flags |= REDIS_BLOCK;
    redisContextConnectTcp(c,ip,port,NULL);
    return c;
}

其中redisConext用来保存与redis服务器连接状态相关信息、输出缓冲区以及回复解析器。结构如下所示:

typedef struct redisContext {
    int err; 
    char errstr[128]; 
    int fd;
    int flags;
    char *obuf; 
    redisReader *reader;
    ...
} redisContext;

其中fd表示与redis服务器建立连接的socket描述符;而flag表示客户端标志位,表示客户端当前的状态;obuf用来保存输出缓存,用户调用reidsCommand向redis发送命令时,命令字符串首先会被追加到obuf中;reader是一个回复解析器,后续会介绍。

1.2 发送命令 & 接收回复

函数原型:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand函数返回NULL表示有错误发生,可以通过检查redisContext中的err得到错误类型;如果执行完成,则返回值是一个redisReply指针,包含了Redis的恢复信息。

redisCommand主要通过redisvCommand实现,而redisvCommand主要是通过redisvAppendCommand和__redisBlockForReply两个实现。

redisvAppendCommand源码如下所示:

int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
    ...
    len = redisvFormatCommand(&cmd,format,ap);
    if (len == -1) {
        __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
        return REDIS_ERR;
    } else if (len == -2) {
        __redisSetError(c,REDIS_ERR_OTHER,"Invalid format string");
        return REDIS_ERR;
    }
    if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
        free(cmd);
        return REDIS_ERR;
    }
    ...
}

redisvAppendCommand函数作用是解析用户的输入,并将用户输入的命令字符串转换成redis统一的格式,暂存到redisContext.obuf中。

__redisBlockForReply源码如下所示:

void *__redisBlockForReply(redisContext *c) {
    void *reply;

    if (c->flags & REDIS_BLOCK) {
        if (redisGetReply(c,&reply) != REDIS_OK)
            return NULL;
        return reply;
    }
    return NULL;
}

int redisGetReply(redisContext *c, void **reply) {
    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
        return REDIS_ERR;
        
    if (aux == NULL && c->flags & REDIS_BLOCK) {
        do {
            if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                return REDIS_ERR;
        } while (!wdone);
        
        do {
            if (redisBufferRead(c) == REDIS_ERR)
                return REDIS_ERR;
            if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                return REDIS_ERR;
        } while (aux == NULL);
    }
    if (reply != NULL) *reply = aux;
    return REDIS_OK;
}

redisGetReply中,首先是循环调用redisBufferWrite,将输出c->obuf中的所有内容发送给redis,然后循环调用redisBufferRead,读取redis的回复,调用redisGetReplyFromReader对回复信息进行解析。

redisBufferRead函数主要是从socket读取数据到buf中,然后通过函数redisReaderFeed,将bug内容追加到解析器的输入缓存中。

2、回复解析api

2.1、解析器缓存

解析器结构redisReader,源码如下所示:

typedef struct redisReader {
    char *buf; 
    size_t pos; 
    size_t len; 
    size_t maxbuf; 
    ...
} redisReader;

buf就是输入缓存,redisReaderFeed函数把读取到的redis恢复信息都暂存于此;len表示当前缓存的容量;pos表示当前缓存的读取索引(每次读取输入缓存时,都从reader->buf + reader->pos处开始读取,读取数据之后,会增加pos的值);maxbuf表示输入缓存最大的允许闲置空间,当buf的空闲空间大于maxbuf时,就会buf,重新申请空间(默认值是16k)。

这里redisReaderFeed就是从socket中读取redis回复信息,追加到解析器缓存中。

2.2、解析回复信息

上述redisGetReply函数中,将redis回复信息追加到解析器输入缓存后,就会调用redisGetReplyFromReader对解析器的输入缓存进行信息解析,最终以redisReply结构呈现。源码如下所示:

typedef struct redisReply {
    int type; 
    long long integer; 
    int len; 
    char *str; 
    size_t elements; 
    struct redisReply **element; 
} redisReply;

其中type表示redis回复信息的类型,其中基本类型主要有下面几种:

  • REDIS_REPLY_STATUS:状态回复,状态信息以'+'开头。str属性保存Redis回复的状态信息字符串,该字符串的长度保存在len属性中。
  • REDIS_REPLY_ERROR:错误回复,错误信息以'-'开头。str属性保存Redis回复的错误信息字符串,该字符串的长度保存在len属性中。
  • REDIS_REPLY_INTEGER:整数回复,整数信息以':'开头。integer 属性保存Redis回复的整数值。
  • REDIS_REPLY_STRING:单行字符串回复,这种信息以'$'开头。str属性保存Redis回复的字符串信息,该字符串的长度保存在len属性中。
  • REDIS_REPLY_NIL:Redis回复”nil”。 而 REDIS_REPLY_ARRAY:数组回复,也就是嵌套回复,数组信息以'*'开头,后面数组元素个数。数组中的元素可以是任意类型。

调用redisReaderGetReply解析之后,最终会形成redisReply结构树,非叶子节点只能是REDIS_REPLY_ARRAY类型,叶子节点只能是上述提到的基本类型。

例如,redis回复信息是*3\r\n*3\r\n:11\r\n:12\r\n:13\r\n*3\r\n:21\r\n:22\r\n:23\r\n:31\r\n,那么最终形成的树如下所示:

1493453014_22_w540_h277.png

使用redisReadTask任务结构来解析回复信息,构建每个redisReply结构节点,填充到树中合适的位置。源码如下所示:

typedef struct redisReader {
    ...
    redisReadTask rstack[9];
    int ridx; 
    void *reply; 
    redisReplyObjectFunctions *fn;
    void *privdata;
} redisReader;

redisReadTask结构数组rstak大小是9;其中rtask0表示redisReply结构树中的根节点;ridx表示当前处理第几层节点;fn包含了用于生成各种类型redisReply结构的函数;reply指向redisReply结构树中的根节点。

redisReadTask结构如下所示:

typedef struct redisReadTask {
    int type;
    int elements;
    int idx; 
    void *obj; 
    struct redisReadTask *parent; 
    void *privdata; 
} redisReadTask;

这个地方有点绕

  • type表示redisReadTask结构当前处理的回复信息类型;
  • elements表示当前构建的REDIS_REPLY_ARRAY类型的redisReply结构节点中包含的子节点数目(上述redisReply结构节点中,数组element中的元素个数);
  • idx表示当前构建的redisReply结构节点,在其父节点redisReply中element数组中的索引;
  • obj指向当前正在构建的REDIS_REPLY_ARRAY类型的redisReply结构节点;
  • partent表示当前正在处理节点的父节点;

redisReaderGetReply源码如下所示:

int redisReaderGetReply(redisReader *r, void **reply) {
    if (reply != NULL)
        *reply = NULL;
    if (r->err)
        return REDIS_ERR;
    if (r->len == 0)
        return REDIS_OK;
    // 初始化操作
    while (r->ridx >= 0)
        if (processItem(r) != REDIS_OK)
            break;
    if (r->err)
        return REDIS_ERR;
        
    if (r->pos >= 1024) {
        sdsrange(r->buf,r->pos,-1);
        r->pos = 0;
        r->len = sdslen(r->buf);
    }
    
    if (r->ridx == -1) {
        if (reply != NULL)
            *reply = r->reply;
        r->reply = NULL;
    }
    return REDIS_OK;
}

回复解析主要步骤:

  1. 设置r->ridx为0,初始化r->rstack0,接下来开始构建根节点
  2. 循环调用processItem函数,直到r->ridx再次等于-1(深度优先),构建一棵redisReply结构树

processItem函数首先得到当前构建节点的结构redisReadTask *cur = &(r->rstack[r->ridx]),然后从输入缓存中读取首个字符,用来判断回复信息的类型,保存到cur->type中。

根据得到的回复类型信息,调用不同的函数处理不同的类型。这里重点看一下处理数组类型的processMultiBulkItem的实现逻辑:

static int processMultiBulkItem(redisReader *r) {
    redisReadTask *cur = &(r->rstack[r->ridx]);
    ...
    if (r->ridx == 8)
        return REDIS_ERR;

    if ((p = readLine(r,NULL)) != NULL) {
        elements = readLongLong(p);
        root = (r->ridx == 0);
        if (elements == -1) {
            if (r->fn && r->fn->createNil)
                obj = r->fn->createNil(cur);
            else
                obj = (void*)REDIS_REPLY_NIL;
            if (obj == NULL) {
                __redisReaderSetErrorOOM(r);
                return REDIS_ERR;
            }
            moveToNextTask(r);
        } else {
            if (r->fn && r->fn->createArray)
                obj = r->fn->createArray(cur,elements);
            else
                obj = (void*)REDIS_REPLY_ARRAY;
                
            if (obj == NULL) 
                return REDIS_ERR;
            
            if (elements > 0) {
                cur->elements = elements;
                cur->obj = obj;
                r->ridx++;
                r->rstack[r->ridx].type = -1;
                r->rstack[r->ridx].elements = -1;
                r->rstack[r->ridx].idx = 0;
                r->rstack[r->ridx].obj = NULL;
                r->rstack[r->ridx].parent = cur;
                r->rstack[r->ridx].privdata = r->privdata;
            } else
                moveToNextTask(r);
        }
        if (root) r->reply = obj;
        return REDIS_OK;
    }
    return REDIS_ERR;
}

首先得到当前构建节点的redisReadTask结构,调用readLine函数,解析出当前节点中包含的元素个数elements。

如果elements正确解析,调用r->fn->createArray创建一个数组类型的redisReply结构节点,将obj以及elements记录到cur中。

创建数组类型redisReply结构函数createArrayObject如下所示:

static void *createArrayObject(const redisReadTask *task, int elements) {
    ...
    r = createReplyObject(REDIS_REPLY_ARRAY);
    r->elements = elements;
    ...
    if (task->parent) {
        parent = task->parent->obj;
        assert(parent->type == REDIS_REPLY_ARRAY);
        parent->element[task->idx] = r;
    }
    return r;
}

如果task->parent不为NULL,说明当前新建的redisReply结构节点有父节点,根据当前task得到该父节点redisReply结构parent,然后将当前节点保存到父节点element数组中的task->idx索引处。

数组类型的redisReply结构节点创建完成之后, 接下来就是构建各个子节点。首先就是将r->ridx加1(ridx为0是根节点),同时初始化r->rtaskr->ridx结构,其中r->rstackr->ridx.idx为0表示接下来首先构建第一个子节点。

如果elements等于0,调用moveToNextTask,为下一个要创建的节点找到合适的位置。源码如下所示:

static void moveToNextTask(redisReader *r) {
    redisReadTask *cur, *prv;
    while (r->ridx >= 0) {
        if (r->ridx == 0) {
            r->ridx--;
            return;
        }
        
        cur = &(r->rstack[r->ridx]);
        prv = &(r->rstack[r->ridx-1]);
        assert(prv->type == REDIS_REPLY_ARRAY);
        if (cur->idx == prv->elements-1) {
            r->ridx--;
        } else {
            assert(cur->idx < prv->elements);
            cur->type = -1;
            cur->elements = -1;
            cur->idx++;
            return;
        }
    }
}

其中cur和prv分别表示当前正处理节点的redisReadTask结构以及父节点的redisReadTask结构。如果cur->idx小于prv->elements,那么接下来,cur结构就要开始构建当前节点的下一个兄弟节点,此时cur->idx需要加1;如果cur->idx等于prv->elements的话,说明当前节点,已经是父节点最后一个孩子节点了,那么接下来,就要开始构建当前节点的叔叔节点了,因此r->ridx--,表示上移一层,从处理父节点的rediReadTask结构开始,继续进行判断;如果当前处理的节点是根节点,即r->ridx=0,直接把r->ridx置为-1之后直接返回。

上面就是回复解析api主要的工作流程,这里redisReply结构树以及redisReadTask结构作用比较晦涩难懂(记住redisReply是最终的树结构,而redisReadTask只是用来辅助构建树结构)。

3、示例程序

示例程序可直接参考hiredis包中的example.c,本地启动一个redis-server测试即可。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "hiredis.h"

int main(int argc, char **argv) {
    unsigned int j;
    redisContext *c;
    redisReply *reply;
    const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";
    int port = (argc > 2) ? atoi(argv[2]) : 6379;

    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }

    /* PING server */
    reply = static_cast<redisReply *>(redisCommand(c,"PING"));
    printf("PING: %s\n", reply->str);
    freeReplyObject(reply);

    /* Set a key */
    reply = static_cast<redisReply *>(redisCommand(c,"SET %s %s", "foo",
    		"hello world"));
    printf("SET: %s\n", reply->str);
    freeReplyObject(reply);
    ...
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

参考

https://github.com/redis/hiredis

http://yaocoder.blog.51cto.com/2668309/1297031

http://blog.csdn.net/kingqizhou/article/details/8104693

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小灰灰

Java 动手写爬虫: 二、 深度爬取

第二篇 前面实现了一个最基础的爬取单网页的爬虫,这一篇则着手解决深度爬取的问题 简单来讲,就是爬了一个网页之后,继续爬这个网页中的链接 1. 需求背景 背景...

73010
来自专栏分布式系统和大数据处理

Command模式入门

提起Command模式,我想没有什么比遥控器的例子更能说明问题了,本文将通过它来一步步实现GOF的Command模式。

1092
来自专栏逸鹏说道

c# 温故而知新: 线程篇(一) 下

Abort 方法: 其实 Abort 方法并没有像字面上的那么简单,释放并终止调用线程,其实当一个线程调用 Abort方法时,会在调用此方法的线程上引发一个异常...

2656
来自专栏Unity

检测Unity客户端无效的预设资源

4、不被代码间接引用,如预设Anniver_01,在代码中是"Anniver_"+number,这种暂时没有好办法解决。

2674
来自专栏Java进阶架构师

03:SpringBoot整合SpringDataJPA实现数据库的访问(二)

首先回忆一下,前面我们创建studentRepo类继承JpaRepository<T,ID>接口,即可实现最基本的crud。如下:

952
来自专栏GreenLeaves

Linq基础知识之延迟执行

Linq中的绝大多数查询运算符都有延迟执行的特性,查询并不是在查询创建的时候执行,而是在遍历的时候执行,也就是在enumerator的MoveNext()方法被...

25110
来自专栏菩提树下的杨过

redis 学习笔记(7)-cluster 客户端(jedis)代码示例

上节学习了cluster的搭建及redis-cli终端下如何操作,但是更常用的场景是在程序代码里对cluster读写,这需要redis-client对clust...

2268
来自专栏GreenLeaves

C# 文件读写系列二

读取文件原则上非常简单,但它不是通过FileInfo和DirectoryInfo来完成的,关于FileInfo和DirectoryInfo请参考C# 文件操作系...

3389
来自专栏芋道源码1024

哪个更快:Java 堆还是本地内存

使用Java的一个好处就是你可以不用亲自来管理内存的分配和释放。当你用new关键字来实例化一个对象时,它所需的内存会自动的在Java堆中分配。堆会被垃圾回收器进...

1184
来自专栏蓝天

可epoll队列

就可以使用epoll来监控队列中是否有数据的队列,当然也支持select和poll。

1032

扫码关注云+社区

领取腾讯云代金券