前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯云 Redis 集群版配置管理揭秘 ( 上 )

腾讯云 Redis 集群版配置管理揭秘 ( 上 )

原创
作者头像
白老大
修改2017-08-14 09:32:35
3.5K0
修改2017-08-14 09:32:35
举报
文章被收录于专栏:白老大的专栏白老大的专栏

导语

腾讯云 Redis(CRS)集群版已经有数千用户,售出数十T容量,那么 CRS 是如何做配置管理的呢?通用的集群系统都需要做配置管理分发,成员健康度检查,希望能带给您启发。

CRS 集群版改造自 QQ 后台存储数据库 Grocery,拥有十多年的技术积累与传承,由 SNG 即通平台部公共组件组多年研发,数据运营部 DBC 组持续运维运营。目前部署有上万台的集群,每秒承受上亿的访问。CRS 集群主要是由管理机、接入机、存储机三种角色组成。配置中心会部署在管理机上,配置客户端则会部署在集群的每台机器上。

配置管理模块,由上图的“配置中心” 与 “配置客户端”组成,是一套C++实现的CS架构。配置中心是一个单进程多线程架构,每个线程负责单独的功能。配置中心进程启动后,首先进行初始化,然后启动各个工作线程。初始化的分析因为不是重点,所以放在文末。现在介绍各个主要的工作模块:

  1. 配置加载:将配置信息从DB加载到内存中 (DB-> 服务端配置)
  2. 存活更新:将VSERVER的存活状态做改变 (客户端状态 -> 服务端配置)
  3. 筛选机器:筛选出需要接收配置的机器 (客户端状态 compare 服务端配置)
  4. 推送配置:将配置推送到指定机器 (服务端配置 -> 客户端)
  5. 接收心跳:接收心跳并更新客户端状态 (客户端 -> 服务端配置)

主要的角色:配置客户端 <-> 配置中心 (客户端状态,服务端配置) <- DB

配置加载

该模块由一个独立的线程LoadConfigThread实施,会把数据库中最新的配置信息加载到共享内存,加载前会有一些合法性校验。

代码语言:javascript
复制
 iRet = pthread_create(&pid, NULL, LoadConfigThread, NULL);
 if (iRet)
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_ERROR_, "Create LoadConfigThread error!\n");
 return -1;
 }

我们一般是如何来更新集群的配置信息呢?

a. 运营系统提交的 DML语句 更改DB中的配置信息; b. 执行 "update config_state_t set cfg_seq=cfg_seq+1;" 更新序列号;

该模块的逻辑包裹在while循环中,循环间隔 sleep(1),这也就是说,当我们在db中更新配置后,大概1秒会加载到共享内存中;

线程会在DB中执行 "Select need_load,cfg_seq From T_SYSCONF",主要是为了取得cfg_seq这个序列号。(历史上会用need_load字段的信息,但现在已弃用,不作为需要加载的标志);

线程每读一次MySQL,都会执行这么一条语句,以更新最后一次读MySQL的时间。update T_SYSCONF set master_cc_read_db_last_time = unix_timestamp() ;这是用于主备配置中心的死机切换判断的。更新失败上报 "646280 主CC设置最后读db时间:失败",更新成功上报 " 和 "646281 主CC设置最后读db时间:成功"。我们通过select from_unixtime(master_cc_read_db_last_time) from T_SYSCONF;就能看到,每秒master_cc_read_db_last_time值都会被更新一次。

如果 cfg_seq已经与进程的全局变量 g_ddwDbSeq不同,则意味着需要把DB的最新配置信息加载到共享内存中了,因为运维人员对Mysql中的配置信息可能有误操作,所以在加载到共享内存前,程序有严格的合法性校验,以免取得错误的配置信息,破坏集群的安全。

那么合法性校验具体是怎么做的呢?

a. 把所有配置加载到待更新的临时配置中 b. 把数组两个元素指向的配置,进行一个比对,这里检验条件就非常多了,如新加的server_id与旧的server_id要行程等差数列,server_name相同的条目copy_id必须不同等等,这里就不一一列举。

代码语言:javascript
复制
//下面这个结构体,我们看到astServerConfigs数组可以保存两个元素,一个保存在用的配置,另一个保存待更新的临时配置
typedef struct 
{
 int dwNowConfIndex; //指向最新更新的配置共享内存的指针 
 ServerConfigs astServerConfigs[2]; 
} ServerConfigsShm;

配置合法性检查不通过,上报"299726 配置中心装载新配置时,因检查不合格而拒绝装载"。这是一个比较重要的上报,因为同时变量g_bDbConfigIsValid = false,后面介绍的[推送配置]模块,如果发现该变量为false,则会终止推送,那么诸如主备切换等新集群信息,也无法下发了,所以需要尽快处理。而我们的运营系统是通过看日志来检测配置是否加载成功的。成功的话,sleep(10)后会把 成功日志写到文件中。

代码语言:javascript
复制
LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "LoadConfigThread LoadConfigFromDB succ,ddwSeq:%ld", ddwSeq);

如果失败,不用sleep任何秒数,直接就会写失败日志:LOG_WATER_MUTEX(&g_stServerLog,_LOG_ERROR_, "LoadConfigFromDB Fail: iRet=%d", iRet);

所以我们校验加载配置是否失败,只需要在数据库update seq后,过1秒后,检验日志文件,是否同时出现了ERR以及LoadConfigFromDB 字眼,如果是,那么就是加载配置失败了。

假如配置的md5的确是发生了改变,那么线程会生成一个配置包待下发,这是因为有时seq发生了改变不一定意味着配置有改变,所以还需看MD5。生成配置包失败会上报"164304 load数据生成配置包失败",成功会上报"182928 server启动,生成配置"。

然后进行临时内存结构体与运行内存结构体的切换,以达到启用新配置的目的,

代码语言:javascript
复制
g_stConfig.pstServerConfigsShm->dwNowConfIndex++;
g_stConfig.pstServerConfigsShm->dwNowConfIndex %= 2;
memcpy( (void*)g_stConfig.pstVServerClientUpdateInfoList->astOneVServerClientUpdateInfo,
stTmpVServerClientUpdateInfoList.astOneVServerClientUpdateInfo,
stTmpVServerClientUpdateInfoList.dwServerNum*sizeof(OneVServerClientUpdateInfo) );
g_stConfig.pstVServerClientUpdateInfoList->dwServerNum = stTmpVServerClientUpdateInfoList.dwServerNum;

最后我们还会更新由新配置生成的md5信息。

代码语言:javascript
复制
memcpy((char*)g_stConfig.pstServerCommInfo->strMD5Hash,strMD5Hash,16);

上述我们看到了ServerConfigsShm结构体,这里我也把相关的需要参考的结构体一并列出。

// 管理所有配置信息的数据结构

代码语言:javascript
复制
typedef struct
{
 VServerConfigList stVServerConfigList;
 IPVServer stIPVServer;
 ServerNameServer stServerNameServer;
 MicsInfoList stMicsInfoList;
 InterfaceIpList stInterfaceIpList;
 AppMicsInfoList stAppMicsInfoList;
} ServerConfigs;
代码语言:javascript
复制
// 管理所有v_server的数据结构
typedef struct 
{
 uint32_t dwVServerNum; //有多少个虚拟机
 OneVServerConfig astOneVServerConfig[MAX_VIRTUAL_COUNT_PER_COPY*MAX_COPY_NUM]; //保存所有虚拟机数据结构的数组
} VServerConfigList;
#define MAX_VIRTUAL_COUNT_PER_COPY 20000 // 单拷贝最大虚拟机数量
#define MAX_COPY_NUM 12 // 最大拷贝数量

// 一个虚拟机的数据结构
typedef struct 
{
 uint32_t dwServerId; // server id, 主键
 uint8_t sServerName[32]; // Server 名字,用于做hash用。多份拷贝的机器采用一一对应方式,一一对应的机器名字相同。这样前端寻址时候hash值才能一样。
 uint32_t dwIPAddr; // 存储机Ip地址,存储的时候用主机序来存储
 uint16_t wVNumber; // 该虚拟机在物理机器中的编号
 uint16_t wRole; // 角色 ,同gro_api.h中的ROLE_TYPE 
 uint16_t wVirtualNodeNum; // 该机器的虚拟节点个数
 uint16_t wIdcId; // 所归属的idc id
 uint16_t wProcNum; // 每个虚拟机器的进程数目 
 uint16_t wWorkFlag; // 工作状态配置,人工设置是否处于运行状态,和gro_api.h中STATE_TYPE定义相同 
 uint16_t wIfScale; // 扩容标记 0--非扩容,1--扩容中 
 uint16_t wRwsFlag; // 读写同步配置,人工设置是否处于读写或者同步
 uint16_t wMaskFlg;
 uint16_t wMaster; 
 uint16_t wWorkingState; // 机器上报运行/非运行状态 
 uint16_t wCopyId; // 拷贝id
#if defined(MEDIA_IS_MEM)
 uint32_t dwMaxAllowReads;
 uint32_t dwMaxAllowWrites;
#elif defined(MEDIA_IS_SSD)
 uint8_t sDevName[32]; // ssd盘设备名,如"dev/sdj",互为备份的VServer该字段可以不同
#else
#error "MEDIA_IS_MEM or MEDIA_IS_SSD must be defined!"
#endif
 uint8_t sReserved[64]; 
}OneVServerConfig;

// v_server客户端的更新信息
typedef struct 
{
 uint32_t dwServerNum;
 OneVServerClientUpdateInfo astOneVServerClientUpdateInfo[MAX_VIRTUAL_COUNT_PER_COPY*MAX_COPY_NUM]; 
} VServerClientUpdateInfoList;

//一条v_server客户端的更新信息
typedef struct 
{
 uint32_t dwServerId; // 虚拟机编号
 time_t dwLastUpateTime; //最后上报时间,用于判断存活心跳
 uint32_t dwLastUpdatePriority; //最后上报时客户端配置的优先级
 uint8_t sMD5Hash[16]; //client最近上报的md5值 
 time_t dwLastPushConfigTime; //配置最后推送时间
 uint8_t sReserved[32]; // 保留4字节
} OneVServerClientUpdateInfo;

//管理所有 interface 机器的配置更新信息
typedef struct 
{
 uint32_t dwServerNum;
 OneInterfaceServerClientUpdateInfo astOneInterfaceServerClientUpdateInfo[MAX_INTERFACE_MACHINE_COUNT]; 
} InterfaceServerClientUpdateInfoList;

// interface 机器信息
typedef struct 
{
 uint32_t dwIP; // ip
 time_t dwLastUpateTime; //最后上报时间
 uint32_t dwLastUpdatePriority; // 最后上报时客户端配置的优先级
 uint8_t sMD5Hash[16]; // client最近上报的md5值 
 time_t dwLastPushConfigTime; //server 给client 下发配置的最新时间
 uint8_t sReserved[32];
} OneInterfaceServerClientUpdateInfo;

typedef struct 
{
 uint16_t wIPNum; 
 OneIPVServer astOneIPVServer[MAX_HARD_COUNT_PER_COPY*MAX_COPY_NUM];
}IPVServer;
#define MAX_HARD_COUNT_PER_COPY 20000 // 单拷贝最大物理机器数量
#define MAX_COPY_NUM 12 // 最大拷贝数量

// 物理机器ip 和虚拟机id 的对应关系
// 便于根据ip寻找虚拟机器
typedef struct 
{
 uint32_t dwIP; 
 uint16_t wVServerNum;
 uint32_t dwServerIdList[MAX_VIRTUAL_NUM_PER_MACHINE];
}OneIPVServer;
#define MAX_VIRTUAL_NUM_PER_MACHINE 20 // 每台物理机器最多有多少台虚拟机器

备注:集群的一台物理存储机,逻辑上会划分为1个或多个VSERVER,每个VSERVER对于集群就是一个独立的存储机,独立提供服务,这有点虚拟化的意思。

存活更新

该模块由线程UpdateVServerStatusThread单独实施,会跟进客户端状态列表中机器的心跳情况,来更新客户端状态,可能是从WORKDING->OFF_WORKING,或者从OFF_WORKING->WORKDING。改变状态后,还会产生推送配置包并放置到推送队列中。

逻辑包裹在while循环中,循环间睡眠间隔0.01秒,usleep(10000) 遍历每一个VSERVER,获取其最后一次上报与当前的时间差距(秒),

代码语言:javascript
复制
tInterval = TimeDiffSeconds(tCur, pstOneVServerClientUpdateInfo->dwLastUpateTime);

如果 tInterval 已经大于某个伐值(如3分钟): 如果当前该VSERVER我们记载是WORKING状态,那么我们就就要将其转为OFF_WORKING了。 如果是OFF_WORKING,则什么都不用做。 如果 tInterval 小于等于某个伐值(如3分钟),这说明了已经有心跳上报了: 如果当前是OFF_WORKING,则转为WORKING。 如果当前是WORKING,则什么都不用做。

按照如上逻辑:死了的机器是过一段时间(可配)才会被置为DEAD,后面的请求才不会转发到这,但机器如果复活了,可能不到1秒就能判断其活了。这就是状态转换的时间差别。

代码语言:javascript
复制
// 如果间隔时间超过伐值
 if (tInterval > g_stConfig.iReportStatusInterval*g_stConfig.iReportStatusTryTimes) //这两个配置参考上述初始化时从配置文件读入的项
 { 
 // 当前是WORKING的
 if (WORKING== pstOneVServerConfig->wWorkingState)
 {
 aiChangeServer[iChangeSvrNum] = pstOneVServerConfig->dwServerId;
 iChangeSvrNum++;
 tm* pTM = NULL;
 char szTime1[256] = {0};
 char szTime2[256] = {0};
 pTM = localtime((time_t*)&pstOneVServerClientUpdateInfo->dwLastUpateTime);
 sprintf(szTime1,"%04d-%02d-%02d %02d:%02d:%02d",pTM->tm_year+1900,pTM->tm_mon+1,pTM->tm_mday,pTM->tm_hour,pTM->tm_min,pTM->tm_sec);
 pTM = localtime((time_t*)&tCur);
 sprintf(szTime2,"%04d-%02d-%02d %02d:%02d:%02d",pTM->tm_year+1900,pTM->tm_mon+1,pTM->tm_mday,pTM->tm_hour,pTM->tm_min,pTM->tm_sec);
 uint32_t dwNIPAddr = htonl(pstOneVServerConfig->dwIPAddr);
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "xxxx001 update state WORKING-->OFF_WORKING,vid:%u tInterval:%u dwLastUpateTime:%u %d %d time2:%s time1:%s ip:%s ",pstOneVServerConfig->dwServerId,tInterval,pstOneVServerClientUpdateInfo->dwLastUpateTime,g_stConfig.iReportStatusInterval,g_stConfig.iReportStatusTryTimes,szTime2,szTime1,inet_ntoa_MT(*(struct in_addr*)&dwNIPAddr).c_str()); 
 }
 }
 else //如果最近短时间内就有过心跳上报
 { // 如果是OFF_WORKING的话,我们就要考虑把他置为WORKING了。
 if (OFF_WORKING== pstOneVServerConfig->wWorkingState)
 {
 aiChangeServer[iChangeSvrNum] = pstOneVServerConfig->dwServerId;
 iChangeSvrNum++;
 tm* pTM = NULL;
 char szTime1[256] = {0};
 char szTime2[256] = {0};
 pTM = localtime((time_t*)&pstOneVServerClientUpdateInfo->dwLastUpateTime);
 sprintf(szTime1,"%04d-%02d-%02d %02d:%02d:%02d",pTM->tm_year+1900,pTM->tm_mon+1,pTM->tm_mday,pTM->tm_hour,pTM->tm_min,pTM->tm_sec);
 pTM = localtime((time_t*)&tCur);
 sprintf(szTime2,"%04d-%02d-%02d %02d:%02d:%02d",pTM->tm_year+1900,pTM->tm_mon+1,pTM->tm_mday,pTM->tm_hour,pTM->tm_min,pTM->tm_sec);
 uint32_t dwNIPAddr = htonl(pstOneVServerConfig->dwIPAddr);
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "xxxx002 update state OFF_WORKING-->WORKING,vid:%u tInterval:%u dwLastUpateTime:%u %d %d time2:%s time1:%s ip:%s ",pstOneVServerConfig->dwServerId,tInterval,pstOneVServerClientUpdateInfo->dwLastUpateTime,g_stConfig.iReportStatusInterval,g_stConfig.iReportStatusTryTimes,szTime2,szTime1,inet_ntoa_MT(*(struct in_addr*)&dwNIPAddr).c_str());
 }
 }

从上述代码我们能看出来,需要进行状态转换的VSERVER,都会把serverid被加到aiChangeServer数组中,所有serverid都加到aiChangeServer数组以后,我们就对该数组进行遍历,并且把对应VSERVER的状态进行变换。

代码语言:javascript
复制
// 更新状态
 for (i = 0; i < iChangeSvrNum; i++)
 {
 iTmpSrvIndex = aiChangeServer[i];
 pstOneVServerConfig = (OneVServerConfig* )FindVServerConfigById(iTmpSrvIndex,pstVServerConfigList);
 if (NULL == pstOneVServerConfig)
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "FindVServerConfigById err, iTmpSrvIndex:%d ",iTmpSrvIndex);
 Attr_API(183911,1); //根据serverid获取server信息失败
 continue;
 }
 pstOneVServerClientUpdateInfo = (OneVServerClientUpdateInfo* )FindOneVServerClientUpdateInfoById(iTmpSrvIndex, pstVServerClientUpdateInfoList);
 if (NULL == pstOneVServerClientUpdateInfo)
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "FindOneVServerClientUpdateInfoById err, iTmpSrvIndex:%d ",iTmpSrvIndex);
 Attr_API(183911,1); //根据serverid获取server信息失败
 continue;
 }
 // 如果当前是WORKDING,那么就改为OFF_WORKING
 if (WORKING== pstOneVServerConfig->wWorkingState)
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "update state WORKING-->OFF_WORKING,vid:%u ",iTmpSrvIndex);
 pstOneVServerConfig->wWorkingState = OFF_WORKING;
 IsUpdateStatus = 1;
 }
 // 如果当前是OFF_WORKDING,就改为WORKING
 else
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_NOTICE_, "update state OFF_WORKING-->WORKING,vid:%u ",iTmpSrvIndex);
 pstOneVServerConfig->wWorkingState = WORKING;
 IsUpdateStatus = 1;
 }
 }

如果配置加载模块,该线程最后会更新服务器配置的MD5值,以及为新的配置生成配置包,并推送到队列中。

筛选机器

线程 PushScheduleThread 负责筛选出需要发送配置的机器,从这里我们知道,配置并不是需要推送到所以机器上的,而之后客户端状态的MD5与服务器状态MD5有差异时,我们才会推送配置给该客户端。这个模块主要就是用来筛选出需要接受配置包的机器。

业务逻辑都在while循环中,循环至少间隔2秒 : sleep(2) 获取当前消息队列 g_MsgHandle 中,未发送的消息数量(机器数量)有多少。这里失败上报"164324 推送调度,获取消息队列数据失败",成功上报"164325 推送调度,消息队列数据还没有处理完"。

如果消息数量不为0,也就是上一轮发送还未完,那么放弃本轮循环的操作,continue进入下一次循环。

如果消息数量为0,也就是之前的消息(机器)都处理完了,那么我们这次就来看看,是否有需要接收配置信息的客户端。

把这个结构体推送至消息队列 g_MsgHandle,START表明这里开始新的一轮,宏定义如下。失败会上报"推送调度cache,将cache放入消息队列失败"。

代码语言:javascript
复制
PutFlagToMsgQueue(START,(char*)strMD5Hash); 
typedef struct 
{
 uint8_t cRole=宏START值是0
 uint32_t dwIP=0
 uint8_t strMD5Hash[16]=服务器最新配置16位md5
} PushConfigInfo;

typedef enum 
{
 START = 0,
 CFG_CLIENT_FOR_CACHE = 1, 
 CFG_CLIENT_FOR_IF = 2,
 CFG_CLIENT_FOR_CACHE_AND_IF = 3,
 END = 4,
} ROLE_TYPE;

接下来还会执行

代码语言:javascript
复制
PutInterfaceClientToMsgQueue(strMD5Hash,tCur,&dwNumber2);

具体的逻辑是,遍历 interface 机器的客户端状态列表 pstInterfaceServerClientUpdateInfoList , 如果发现其最后上报心跳时间超时了,说明可能死机了,那么将其上报md5置0,这样MD5不一致会导致配置一直往其发送,这样后续机器复活后也能获得新配置。

如果发现最后上报心跳时间没有超时,比较interface上报的MD5与服务器保存的MD5;

如果一致的话,说明无需推送interface配置已经是最新;

如果不一致,那么上报"184114 推送interface配置,client优先级较低",并将如下结构体压入消息队列,并且将该interface配置的最后推送时间置为当前时间;过程出现问题上报"164329 推送调度interface,将interface放入消息队列失败"

代码语言:javascript
复制
typedef struct 
{
 uint8_t cRole=宏CFG_CLIENT_FOR_IF值是2
 uint32_t dwIP=interface的IP
 uint8_t strMD5Hash[16]=服务器最新配置md5
} PushConfigInfo;

接下来还会执行

代码语言:javascript
复制
PutCacheServerToMsgQueue(strMD5Hash,tCur,&dwNumber1);

具体的逻辑是,遍历所有 cache 机器,获取每个cache机器上的第一个serverid(一个机器可能有若干个serverid),根据这个serverid,获取该VSERVER的服务器配置与客户端状态信息。如果该机器当前是OFF_WORKING状态,那么很可能该机器就死机了,就将其MD5值置空,以防止机器重启后一直获取不到配置的问题。这里与上述遍历interface机器的机器的逻辑有点相似,但上述是要计算是否已经超时,而cache机器本身就有WORKING与OFF_WORKING状态,就不用再计算。 然后就对比服务器与客户端的MD5值,

如果一致的话,说明无需向该CACHE推送配置,并且continue到下一个循环。

如果不一致,则说明该CACHE需要获取最新的配置了,我们把这个结构体压进消息队列:

代码语言:javascript
复制
typedef struct 
{
 uint8_t cRole=宏CFG_CLIENT_FOR_CACHE值是1
 uint32_t dwIP=cache的IP
 uint8_t strMD5Hash[16]=服务器最新配置md5
} PushConfigInfo;

压入消息队列成功后,还需要更新该ip下所有 VSERVER 的 最后推送配置时间 dwLastPushConfigTime 更改成当前时间。

如果上面的确有需要推送的机器被压入队列,那么就把END与md5值压到队列中,否则把END与空的md5值压入到队列中。所以你猜到了,后面的[推送模块],看到END这个cRole后,还得判断MD5值是否全0,来判断是否有要推送的机器。

代码语言:javascript
复制
 // 如果有需要推送的机器,把END标记也压到消息队列中
 if (dwNumber > 0)
 {
 PutFlagToMsgQueue(END,(char*)strMD5Hash);
 }
 // 如果没有需要推送的机器,也把END标记压到消息队列中,然后把为0的md5值也压入.
 else
 {
 uint8_t strTmpMD5Hash[16];
 memset((char* )strTmpMD5Hash,0,16);
 PutFlagToMsgQueue(END,(char*)strTmpMD5Hash);
 }

typedef struct 
{
 uint8_t cRole=宏END值是4
 uint32_t dwIP=0
 uint8_t strMD5Hash[16]=服务器最新配置md5 或者 '0000000000000000'
} PushConfigInfo;

配置推送

这个模块是将最新的配置信息,推送到 [筛选机器]模块中指定的机器上。这个模块有个特点,为了加速配置的推送,会创建很多线程,数量根据配置文件中的配置项 PushConfigThreadNum 来定(一般是200)

代码语言:javascript
复制
for (i=0;i<g_stConfig.iPushConfigThreadNum;i++)
{
 iRet = pthread_create(&pid, NULL, PushConfigThread, NULL);
 if (iRet)
 {
 LOG_WATER_MUTEX(&g_stServerLog,_LOG_ERROR_, "Create PushConfigThread error!\n");
 return -4;
 }
}

PushConfigThread()中具体的逻辑是什么呢? 线程会不断地循环,每次循环会固定睡眠0.01秒:usleep(10000); 然后会获取消息队列中的一个数据结构(消息结构如上),不能读出(消息队列为空),则sleep(1),读出的话,解释出其IP地址,再解释其cRole字段。

如果cRole = START,这说明还没有需要推送的机器,continue到下一个循环。

如果cRole = END,那么说明本轮配置已经全部下发。这里会比较这个消息结构的MD5与服务器配置的MD5是否一致,

如果不一致(也就是全0的情况):那么说明本轮其实并没有需要下发配置的机器,continue到下个循环。

如果一致,说明本轮确认是进行过机器的配置下发。那么检查一下配置推送的结果,主要看: a. 成功推送配置的数量 ; b.应该推送的数量, c.没有推送错误的数量,如果a小于b(很可能别的线程在发送并且未发送完),并且没有c(错误量)的发生,那么我们稍微等一段时间(可配)。最后将这三者的结果打印日志,以及入库与上报Monitor等等。我们查问题是:select cfg_seq,finish_time,success from T_DISTRIBUTE_STATUS来看某次具体的下发是否成功,success是0代表成功,1代表失败。

如果非START也非END,那么向刚才解释出来的IP,客户端端口,推送配置。这才是最重要的。推送失败上报"626449 推送配置到指定IP失败",并且把对应error结果加1.如果成功,把对应success结果加1.推送成功,会记录推送的耗时时间并上报。

"182940 server下发配置时间在0-1" //单位毫秒,书醒从182940至182956都是记录推送时间的。

《 腾讯云 Redis 集群版配置管理揭秘 ( 下 )》

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导语
  • 配置加载
  • 存活更新
  • 筛选机器
  • 配置推送
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档