专栏首页蓝天Redis源码笔记-初步

Redis源码笔记-初步

1. 前言

Redis代码优美,注释也很到位,阅读起来会赏心悦目,大大降低了理解门槛。由于redis单线程几乎完成所有工作,整体逻辑是相当复杂的,涉及了太多状态,作者的技术深厚可见一斑。

Redis的单线程设计给出了一种优雅实现高性能服务思路,在实践中值得借鉴。需要注意Redis并不是严格的单线程,实际上它是多进程+多线程。为解决IO和慢速操作带的性能毛刺和卡顿,Redis实现引入的多进程和多线程。但是它的主体仍然是单线程的,单个线程完成网络IO操作和其它操作。

疑问:Redis选择slot数为16384,是一个偶数,没有采用质数,这个可能并不是一个最好的主意。

2. 名词

RESP

REdis Serialization Protocol

Redis序列化协议

AE

A simple Event drived programming library

一个简单的事务驱动编程库

ASAP

AOF

Append Only File

仅仅追加写文件

BIO

Backgroup IO

后台IO操作,三种涉及BIO:FSYNC、关闭文件和内存free,均为阻塞或慢操作

3. dict.c

哈希表的实现,哈希函数使用了siphash哈希算法。

3.1. siphash算法

一种非加密的64位哈希算法。

3.2. 核心函数

Redis中非常核心的算法——哈希表的实现在这个文件中,很多命令都有用到,比如set/hset等,它是除内存分配管理外的最基础实现。核心函数包含但不限于:dictFind、dictNext、dictAdd、dictDelete、dictFetchValue、dictGetHash、dictReplace、dictAddRaw、dictCreate、dictDelete、dictRelease、dictUnlink、dictRehash、dictEmpty、dictResize等。

3.3. 核心宏

CLUSTER_SLOTS

定义Redis集群Slots个数,值为16384

DICT_HT_INITIAL_SIZE

定义哈希表默认大小宏,值为4

dictSetVal

设置Value

dictSetKey

设置Key

dictCompareKeys

比较两个Key

dictHashKey

对Key求哈希

dictGetKey

取Key

dictGetVal

取Value

dictSlots

得到槽(slot)个数

dictSize

得到已用槽(slot)个数

dictGetSignedIntegerVal

取8字节有符号整数值

dictGetUnsignedIntegerVal

取8字节无符号整数值

dictGetDoubleVal

取double类型值

3.4. 核心结构体

3.4.1. dictEntry

定义了哈希节点的数据结构:

typedef struct dictEntry {
void *key; // Key
union { // 支持4种类型的值
void *val; // 字符串值,或二进制值
uint64_t u64; // 8字节无符号整数值
int64_t s64; // 8字节有符号整数值
double d; // 双精度浮点值
} v;
struct dictEntry *next; // 下一节点,链地址冲突解决法
} dictEntry;

4. Redis命令

各具体的命令并不做持久化(写AOF等)和传播给Slaves,这两项是公共操作,在上层统一执行,具体函数为server.c中的“void call(client *c, int flags)”,具体执行函数为server.c中的“void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)”。

4.1. SELECT命令

群里有人问select跟普通命令的效率,猜想select没有效率问题,阅读源代码也证实了这一点。因为它只是改变指针指向,所以不存在效率问题,自然比普通命令效率要高。

Redis支持的所有命令,都存储在类型为redisCommand函数表中,函数表名为redisCommandTable的数组中。

4.1.1. redisCommand结构体

如果以C++来理解redisCommand,可以将redisCommand看抽象基类,它定义了两个虚函数proc和getkeys_proc,各种command是它的具体实现。

// server.h
struct redisCommand {
char *name; // 命令名,比如:GET
redisCommandProc *proc; // 命令处理过程(函数指针)
// 命令参数的个数,用于检查命令请求的格式是否正确
// 如果这个值为负数-N,那么表示参数的数量大于等于N
// 注意:命令的名字本身也是一个参数。
int arity;
// 字符串形式的标识值, 这个值记录了命令的属性
// 比如是读命令还是写命令,是否允许在载入数据时使用,是否允许在 Lua 脚本中使用等
char *sflags; /* Flags as string representation, one char per flag. */
// sflags对应的二进制值,方便程序“与”、“或”等操作操作
int flags;    /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc; // 函数指针
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey;  /* The last argument that's a key */
int keystep;  /* The step between first and last key */
// 服务器执行这个命令所耗费的总时长
long long microseconds;;
// 服务器总共执行了多少次这个命令
long long calls;
};

4.1.2. redisCommandTable变量

redisCommandTable的部分定义:

// server.c
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
。。。。。。
{"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
。。。。。。
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
};

4.1.3. selectCommand函数

// server.c
void selectCommand(client *c) {
long id;
// 第一个参数值为DB的ID
if (getLongFromObjectOrReply(c, c->argv[1], &id,
"invalid DB index") != C_OK)
return;
// 集群不支持SELECT命令
if (server.cluster_enabled && id != 0) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
if (selectDb(c,id) == C_ERR) { // 切换DB
addReplyError(c,"DB index is out of range");
} else {
addReply(c,shared.ok);
}
}
// db.c
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum) // dbnum值由redis.conf中的databases决定,默认为16
return C_ERR;
c->db = &server.db[id];
return C_OK;
}

4.2. SET命令

SET命令主要是字典操作(dict)。

4.2.1. setCommand函数

// t_string.c
/* SET key value [NX] [XX] [EX ] [PX ] */
void setCommand(client *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_SET_NO_FLAGS;
// 以下一长段均为参数选项解析,
// 第一3个开始,因为第一个为命令SET自身,第二个为KEY,第三个为VALUE
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_XX))
{
flags |= OBJ_SET_NX;
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_NX))
{
flags |= OBJ_SET_XX;
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_PX) && next)
{
flags |= OBJ_SET_EX;
unit = UNIT_SECONDS;
expire = next;
j++;
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_EX) && next)
{
flags |= OBJ_SET_PX;
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr); // 语法错误
return;
}
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
// t_string.c
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
。。。。。。
setKey(c->db,key,val);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
// db.c
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) { // 性能开销主要在这,参见dictFind函数
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
incrRefCount(val);
removeExpire(db,key);
signalModifiedKey(db,key);
}
// db.c
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
// dict.h
#define dictSetVal(d, entry, _val_) do { \
if ((d)->type->valDup) \
(entry)->v.val = (d)->type->valDup((d)->privdata, _val_); \
else \
(entry)->v.val = (_val_); \
} while(0)
// db.c
// KEY存在时覆盖写
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
dictEntry auxentry = *de;
robj *old = dictGetVal(de);
// 内存策略由redis.conf中的maxmemory-policy决定
// LFU策略是4.0开始引入的(Least Frequently Used,最不经常使用)
// #define MAXMEMORY_FLAG_LRU (1<<0)
// #define MAXMEMORY_FLAG_LFU (1<<1)
// #define MAXMEMORY_FLAG_ALLKEYS (1<<2)
// #define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)
// #define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)
// #define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)
// #define MAXMEMORY_VOLATILE_TTL (2<<8)
// #define MAXMEMORY_VOLATILE_RANDOM (3<<8)
// #define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)
// #define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)
// #define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)
// #define MAXMEMORY_NO_EVICTION (7<<8)
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(old);
dictSetVal(db->dict, &auxentry, NULL);
}
dictFreeVal(db->dict, &auxentry);
}

4.2.2. dictAdd函数

// dict.c
// KEY不存在时新增
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}

4.2.3. dictFind函数

字典查找实现:

dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
// #define dictHashKey(d, key) (d)->type->hashFunction(key)
h = dictHashKey(d, key); // 对key求哈希值
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) { // 链式冲突解决法
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL; // 重哈希
}
return NULL;
}
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;

4.3. HSETNX命令

4.3.1. hsetnxCommand函数

// t_hash.c
void hsetnxCommand(client *c) {
robj *o;
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,3);
if (hashTypeExists(o, c->argv[2]->ptr)) {
addReply(c, shared.czero);
} else {
hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY);
addReply(c, shared.cone);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++;
}
}

4.3.2. hashTypeSet函数

// t_hash.c
int hashTypeSet(robj *o, sds field, sds value, int flags) {
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
// ziplistInsert
// ziplistPush
} else if (o->encoding == OBJ_ENCODING_HT) {
。。。。。。
dictAdd(o->ptr,f,v);
} else {
serverPanic("Unknown hash encoding");
}
}

5. Redis多路复用机制

5.1. AE是什么?

“AE”为“A simple event-driven programming library”的缩写,翻译成中文,即一个简单的事件驱动编程库。就Linux而言,可简单理解为对epoll的封装。

5.2. 多路复用选择aeApiPoll

Redis支持select、epoll、kqueue和evport四种模式,编译Redis时决定采用哪一种,因此运行时只会有一种有效。优先顺序是:evport -> epoll -> kqueue -> select,其中select为兜底用,因为所有系统都会支持select。

对应的源码文件,分别为:

evport

ae_evport.c

Solaris

epoll

ae_epoll.c

Linux系统

kqueue

ae_kqueue.c

BSD类系统

select

ae_select.c

其它不支持epoll、evport和kqueue系统

桥接它们的文件则是ae.c,桥接代码:

/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

宏HAVE_EVPORT、HAVE_EPOLL、HAVE_KQUEUE则在config.h中决议出:

#ifdef __sun
#include
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif

5.3. 多路复用调用aeCreateFileEvent

自main函数开始的调用过程(读事件和写事件的顺序,作者在这里用了个小技巧,支持优先响应是查询,即立即返回查询结果):

int main(int argc, char **argv) { // server.c
。。。。。。
// aeMain(server.el);
void aeMain(aeEventLoop *eventLoop) // ae.c
{
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop); // 这很重要,写AOF文件在这里进行
// aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { // ae.c
// linux实际调用的是ae_epoll.c中的aeApiPoll函数
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
// 一般优先处理读事件,然后再写事件,但有时为立即响应是查询则写优先
/* Normally we execute the readable event first,
and the writable event laster. */
int invert = fe->mask & AE_BARRIER;
/* Fire the readable event if the call sequence is not inverted. */
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
/* Fire the writable event. */
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
/* If we have to invert the call,
fire the readable event now after the writable one. */
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
// 函数aeCreateFileEvent中设置rfileProc和wfileProc
// rfileProc和wfileProc的初始化均在aeCreateFileEvent中完成
}
}
}
}
。。。。。。
}
// 初始化rfileProc和wfileProc
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd]; // events是一个以fd为下标的数组,0查找
if (aeApiAddEvent(eventLoop, fd, mask) == -1) // fd放到epoll或select等中
return AE_ERR;
fe->mask |= mask; // 读写事件
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData; // 附加数据
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd; // 这个主要给select时用,epoll等不需要
return AE_OK;
}

6. 网络收发过程

6.1. 过程简要

main
-> initServer
-> aeCreateFileEvent(acceptTcpHandler)
-> createClient
-> aeCreateFileEvent(readQueryFromClient)
-> read
-> processCommand
-> lookupCommand
-> call(c,CMD_CALL_FULL) // call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves
-> c->cmd->proc(c) // 这个proc就是各种redis命令处理过程,比如:setCommand、hgetCommand等

6.2. 接受连接请求acceptTcpHandler

在main函数中,会为accept注册一个事件acceptTcpHandler:

// server.c
int main(int argc, char **argv) {
void initServer(void) {
// 支持多个IP,即redis.conf中可以bind多个IP
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], // 以读事件方式加入到epoll中
AE_READABLE, acceptTcpHandler, NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
}
}
// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// 一次性连接接受max个连接,以提升效率
while(max--) {
// anetTcpAccept处理了中断ENTR,方式是重试到成功或出错
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK) // EWOULDBLOCK是正常的
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip);
}
}
// networking.c
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// createClient为aceeptCommonHandler调用的核心函数
// 在createClient中会为fd关联读事件过程,并加入到epoll中
c = createClient(fd);
// 控制连接数不超过redis.conf上配置的最大数
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++; // 被拒绝的连接数增一
freeClient(c);
return;
}
// 保护模式处理
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
// 保护模式下,只允许127.0.0.1连接
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
char *err = "-DENIED Redis is running in protected mode because protected。。。。。。";
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++; // 被拒绝的连接数增一
freeClient(c);
return;
}
}
server.stat_numconnections++; // 当前连接数增一
c->flags |= flags;
}

6.3. 创建client对象createClient

// networking.c
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// 如果redis.conf中开启了tcpkeepalive,配置项名tcp-keepalive,单位:秒
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册c到epoll,并关联readQueryFromClient,事件为读事件AE_READABLE
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
selectDb(c,0); // 默认选择第1个DB,调用者需要执行select命令切换
。。。。。。
}

6.4. 读取命令readQueryFromClient

// networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*)privdata;
int nread = read(fd, c->querybuf+qblen, readlen);
processInputBufferAndReplicate(c);
}
/* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-slaves, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c); // 处理命令
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
// 处理主从复制
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}

6.5. 命令处理processCommand

processInputBuffer调用processCommand,大量逻辑在processCommand中:

// networking.c
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
void processInputBuffer(client *c) {
server.current_client = c;
int processCommand(client *c) {
// 单独处理QUIT命令
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
// 查找命令的处理
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",。。。);
sdsfree(args);
return C_OK;
}
else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 参数个数不对
addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);
return C_OK;
}
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
// 检查是否有设置密码,和密码是否已经验证通过
addReply(c,shared.noautherr);
return C_OK;
}
// 检查是否需要重定向
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand)) {
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
}
else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 检查最大内存,如果超过配置的maxmemory,
// 则返回错误“-OOM command not allowed when used memory > 'maxmemory'.”
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the propagation
* of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions? Error. */
if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
addReply(c, shared.oomerr);
return C_OK;
}
}
// 磁盘有问题不接受写请求
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) {
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReply(c, shared.bgsaveerr);
else
addReplySds(c, sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}
// 不接受写请求,如果没有足够多好的slave
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write) {
addReply(c, shared.noreplicaserr);
return C_OK;
}
// 只读的slave不接受写请求
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
// 只接受SUB、UNSUB、PSUB、PUNSUB和PING命令
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
// 正在加载DB,直接返回错误
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
/* Lua script too slow? Only allow a limited number of commands. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
/* Add a new command into the MULTI commands queue */
queueMultiCommand(c); // 链上新的事务子命令(队列结构),等到EXEC时一块执行
addReply(c,shared.queued); // 向client返回QUEUED
} else {
call(c,CMD_CALL_FULL); // 调用命令处理过程
c->woff = server.master_repl_offset; // 更新复制偏移
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
}

7. 持久化和复制

call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves。

int processCommand(client *c) {
。。。。。。
void call(client *c, int flags) {
。。。。。。
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}

7.1. propagate函数(aof&replication)

/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + PROPAGATE_NONE (no propagation of command at all)
* + PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + PROPAGATE_REPL (propagate into the replication link)
*
* This should not be used inside commands implementation. Use instead
* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc); // 写AOF
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc); // 推送给Slaves
}

7.2. 写AOF文件feedAppendOnlyFile

feedAppendOnlyFile并不实际写文件,而只是将内容组织到AOF buffer中。写AOF文件是在每次epoll之前进行,redis-server启动时注册了函数beforeSleep,它在每次epoll之前被调用。beforeSleep调用flushAppendOnlyFile将AOF buffer写入到AOF文件。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
。。。。。。
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON) // aof_state值由redis.conf中的appendonly控制
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
}

redis-server启动:

int main(int argc, char **argv) {
aeSetBeforeSleepProc(server.el, beforeSleep); // 注册回调beforeSleep
aeSetAfterSleepProc(server.el, afterSleep);
aeMain(server.el);
}

调用beforeSleep:

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop); // 将AOF buffer写入到AOF文件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // epoll
}
}

调用flushAppendOnlyFile函数将AOF buffer写入到AOF文件:

/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
。。。。。。
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
。。。。。。
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
。。。。。。
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
。。。。。。
}

写AOF文件:

/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
// aofWrite调用write将AOF buffer写入到AOF文件,处理了ENTR,其它没什么
ssize_t nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
。。。。。。
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
return; /* We'll try again on the next call... */
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
}
。。。。。。
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
// redis_fsync是一个宏,Linux实际为fdatasync,其它为fsync
// 所以最好不要将redis.conf中的appendfsync设置为always,这极影响性能
// 注意,Redis的实现并没有处理fsync和close的返回值,所以还不是最严格的
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
}
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) {
// 如果已在sync状态,则不再重复
// BIO线程会间隔设置sync_in_progress
// if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
//     sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
if (!sync_in_progress)
// eversec性能并不那么糟糕,因为它:
// 后台方式执行fsync
// Redis并不是严格意义上的单线程,实际上它创建一组BIO线程,专门处理阻塞和慢操作
// 这些操作就包括FSYNC,另外还有关闭文件和内存的free两个操作。
// 不像always,EVERYSEC模式并不立即调用fsync,
// 而是将这个操作丢给了BIO线程异步执行,
// BIO线程在进程启动时被创建,两者间通过bio_jobs和bio_pending两个
// 全局对象交互,其中主线程负责写,BIO线程负责消费。
aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}

7.3. 数据复制replicationFeedSlaves

这里也不会真正将数据发给Slaves,而只是将数据放入到replication buffer中。

/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
。。。。。。
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) { // 遍历所有的slaves
client *slave = ln->value; // Slave也被当作client
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}

8. 进程启动时做了些什么?

进程启动时事项:

1) 为所有配置设置默认值

2) 加载配置文件redis.conf,覆盖默认值

3) 安装信号处理器

4) 初始化全局状态

5) 创建epoll,启动TCP监控

6) 创建BIO线程

7) 加载module

8) 加载AOF或RDB

9) 设置beforeSleep和afterSleep两个回调

10) 进入epoll循环

int main(int argc, char **argv) {
// 初始化所有配置,为它们设置默认值
initServerConfig();
// 如果是sentinel模式,额外做相关初始化
// sentinel_mode的值由命令行参数中是否包含参数“--sentinel”决定
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// redis-server和redis-check-rdb、redis-check-aof共享了同一个main函数
// 进入redis_check_rdb_main或redis_check_aof_main后不会再返回,
// 也就是这两个函数后的代码均不会被执行。
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
// 加载redis.conf,
// 包括初始化module列表loadmodule_queue,
// 注意会将redis-server的命令行参数一并传递给module
loadServerConfig(configfile,options);
// 置为deamon方式,
// 可通过redis.conf中的daemonize控制是否以deamon方式运行
if (background) daemonize();
// 对redis-server核心的初始化,包括但不限于:
// 1) 忽略HUP和PIPE信号
// 2) 安装TERM和INT两个信号的处理器
// 3) 初始化全局的redisServer对象server
// 4) 注册定时器回调serverCron
// 5) 注册accept的回调acceptTcpHandler
// 6) 注册UNIX套接字回调acceptUnixHandler
// 7) 注册模块回调moduleBlockedClientPipeReadable
// 8) 创建或追加模式打开AOF文件
// 9) 集群server.cluster初始化
// 10) 复制脚本初始化
// 11) 初始化lua运行环境
// 12) 慢日志初始化
// 13) 初始化延迟监控(latency monitor)
// 14) 创建BIO线程,由bio.h中的BIO_NUM_OPS决定个数,当前为3个
initServer();
// 如果是daemon方式,则创建pid文件
if (background || server.pidfile) createPidFile();
// 修改进程的title,加入状态和端口号,这样对ps等更为友好
redisSetProcTitle(argv[0]);
// 进程启动时显示LOGO,
// 可通过redis.conf中的always-show-logo控制是否显示
redisAsciiArt();
// 检查TCP的“/proc/sys/net/core/somaxconn”
checkTcpBacklogSettings
if (server.sentinel_mode) {
sentinelIsRunning();
} else {
// 如果是Linux,
// 检查“/proc/sys/vm/overcommit_memory”
// 和“/sys/kernel/mm/transparent_hugepage/enabled”的值。
// “/proc/sys/vm/overcommit_memory”值需要为1,
// “/sys/kernel/mm/transparent_hugepage/enabled”需要为“never”
linuxMemoryWarnings();
// 加载所有的模块,如果有一个加载失败则启动失败
// 具体有哪些模块,由redis.conf中的loadmodule决定,如:
// loadmodule /path/to/my_module.so
// loadmodule /path/to/other_module.so
// 每个module一行
moduleLoadFromQueue();
// 从磁盘加载RDB或AOF到内存
// 如果开启了AOF,
// 则调用loadAppendOnlyFile加载AOF文件,这个时候不会加载RDB文件
// 没开启AOF时,调用rdbLoad加载RDB文件
// 不管是AOF还是RDB,加载不成功时,进程均不能启动。
loadDataFromDisk();
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
if (server.ipfd_count > 0)
serverLog(LL_NOTICE,"Ready to accept connections");
if (server.sofd > 0)
serverLog(LL_NOTICE,
"The server is now ready to accept connections at %s",
server.unixsocket);
}
// redis.conf中的maxmemory值过小,进行警告提示
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
// 注册回调beforeSleep,
// 对于eversec模式的appendfsync将在这个回调中进行
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
// 进入epoll主循环:
// BSD为kqueue死循环,
// Solaris为evport死循环,
// 其它为select死循环
//
// 网络数据的收和发,以及所有COMMAND的操作均在这个循环中完成
aeMain(server.el);
// 关闭epoll,释放相关资源
aeDeleteEventLoop(server.el);
return 0;
}

9. 核心对象

Redis虽然采用C语言实现,但内含明显的对象,核心的对象包括:

9.1. redisServer

这是一个超大的对象,实为redis-server的Context对象,承载了redis-server的各种全局状态。下面只列出它的一小部分:

// server.h
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
char *configfile; /* Absolute config file path, or NULL */
/* Networking */
int port; /* TCP listening port */
/* AOF persistence */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
/* Logging */
char *logfile; /* Path of log file */
/* Replication (master) */
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
/* Replication (slave) */
char *masterhost; /* Hostname of master */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
/* Scripting */
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
/* Latency monitor */
dict *latency_events;
};

9.2. client

一个client对象,可以是一个Client,也可以是一个slave,也可以是一个master。

// server.h
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
list *reply; /* List of reply objects to send to the client. */
int authenticated; /* When requirepass is non-NULL. */
int replstate; /* Replication state if this is a slave. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
};

9.3. redisCommand

Redis命令结构:

struct redisCommand {
char *name; // Redis命令名,如:GET等
redisCommandProc *proc; // 处理该命令的过程
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags;    /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey;  /* The last argument that's a key */
int keystep;  /* The step between first and last key */
long long microseconds, calls;
};

9.4. redisDb

Redis的DB结构:

/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict;             /* The keyspace for this DB */
dict *expires;          /* Timeout of keys with a timeout set */
dict *blocking_keys;    /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys;       /* Blocked keys that received a PUSH */
dict *watched_keys;     /* WATCHED keys for MULTI/EXEC CAS */
int id;                 /* Database ID */
long long avg_ttl;      /* Average TTL, just for stats */
list *defrag_later;     /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

9.5. clusterNode

Redis集群节点结构:

typedef long long mstime_t; /* millisecond time type. */
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags;      /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots;   /* Number of slots handled by this node */
int numslaves;  /* Number of slave nodes, if this is a master */
// 所有的slave节点
struct clusterNode **slaves; /* pointers to slave nodes */
// master节点
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
// 最近一次发送ping的时间
mstime_t ping_sent;      /* Unix time we sent latest ping */
// 最近一次接收pong的时间
mstime_t pong_received;  /* Unix time we received the pong */
mstime_t fail_time;      /* Unix time when FAIL flag was set */
mstime_t voted_time;     /* Last time we voted for a slave of this master */
mstime_t repl_offset_time;  /* Unix time we received offset for this node */
mstime_t orphaned_time;     /* Starting time of orphaned master condition */
long long repl_offset;      /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */
int port;                   /* Latest known clients port of this node */
int cport;                  /* Latest known cluster port of this node. */
clusterLink *link;          /* TCP/IP link with this node */
list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;

9.6. clusterState

Redis集群状态结构:

typedef struct clusterState {
clusterNode *myself;  /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count;    /* Number of votes received so far. */
int failover_auth_sent;     /* True if we already asked for votes. */
int failover_auth_rank;     /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* The followign fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
} clusterState;

10. 主备同步

待续。

11. 主备切换

待续。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Oops错误

    在at91rm9200下写了一个spi的驱动,加载后,运行测试程序时,蹦出这么个吓人的东西: Unable to handle kernel paging r...

    一见
  • Exploring the ext3 Filesystem

    Multiple Journaling Modes in the ext3 Filesystem

    一见
  • Fast Scatter-Gather I/O

    Some applications may need to read or write data to multiple buffers, which are ...

    一见
  • Elasticsearch 7.x生产配置

    版权声明:本文为博主原创文章,欢迎转载。 ...

    程裕强
  • ROS 2 Eloquent Elusor安装和使用汇总

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    zhangrelay
  • SAP Fiori应用发生超时错误的一个可能原因

    Yesterday I spent almost the whole day to resolve a timeout issue in one CRM Fio...

    Jerry Wang
  • GTX 1080+Ubuntu16.04+CUDA8.0+cuDNN5.0+TensorFlow

    GTX 1080+Ubuntu16.04+CUDA8.0+cuDNN5.0+TensorFlow 安装指导

    用户1908973
  • Tree - Information Theory

    This will be a series of post about Tree model and relevant ensemble method, inc...

    风雨中的小七
  • SAP Fiori里的两种锁机制

    This approach is used in SAP CRM Fiori. Suppose user Jerry has opened a given o...

    Jerry Wang
  • redis配置详解(中英文)

    V2.8.21: (中英字幕同步) # Redis configuration file example #* Redis 配置文件例子 # Note on...

    三丰SanFeng

扫码关注云+社区

领取腾讯云代金券