在线上项目中,很多时候需要起一个daemon做守护进程,用于不停地或以一定间隔地执行工作,比如每隔20s把内存中的数据做快照写磁盘。
一些项目中的daemon是以单机仅运行一个实例的方式存在的。这样做的理由主要有:
(1)很多daemon没必要多机同时运行,因为这样会重复做一些工作,意义不大且浪费性能。
(2)有些daemon多机同时运行是会出问题的,比如上报数据,转账等,一旦重复会导致很多竞争和不一致的问题。
但另一方面,单机运行daemon又很容易造成单点故障的问题,无法做到多机容灾。于是在这样的背景下,我们基于zookeeper设计了这个daemon框架,利用分布式锁的概念和心跳监控等措施,保证了多机环境下同一时间有且仅有一个daemon正在运行,同时监控daemon的运行情况,及时告警。
整个daemon框架主要是由业务daemon模块,zookeeper集群和platform_daemon_alive_monitor监控模块三大部分构成。
一个daemon模块由多个daemon实例构成,每个daemon实例由一个master加一个或多个worker构成。daemon本身是采用主从模型,master初始化和维护一些基础配置数据,并负责和zookeeper集群通信;接着fork出多个worker子进程来执行工作,同时监控子进程的执行:在worker退出时负责回收,并在worker异常退出时告警且重新fork出足够数量的worker继续执行工作。
一个分布式协调服务的zookeeper集群,当中维护了一个“目录树”,并由内部分布式一致性协议保证了集群中各个节点上这棵树的内容是一致的,对zookeeper的操作其实就是对这棵树的操作。同时它也是采用的主从模型,由leader节点对外提供服务,follower节点做容灾备份,保证了其高可用性(存活节点数大于N/2即可正常运行)。
每个daemon启动后需要到这个zookeeper集群的目录树中注册一个有序临时节点,由这个临时节点充当了心跳的角色,只要这个临时节点还存在,则说明对应的daemon的master仍然存活,daemon尚可工作。同时因为各个daemon注册的临时节点是有序的,我们规定只有获得最小序号的那个daemon可以工作,其余的daemon要保持休眠状态,直到上一个daemon因为一些原因停止工作后才由次小序号的daemon接替工作,这样就保证了同一时间点只有一个daemon处于工作状态,并且不会产生惊群现象。如图所示:
platform_daemon_alive_monitor模块用于监控zookeeper上这颗目录树,每隔5分钟扫描一次这个目录树,把其中临时节点数少于N的daemon告警出来,我们当前把这个值设置为2,要求所有daemon至少部署2台。platform_daemon_alive_monitor目前是简单地用python脚本+crontab的方式来实现的。实际上,我们把这个脚本部署在了所有zookeeper集群上,让它同时监控了各台机器上zookeeper节点自身的运行情况,以便在zookeeper部分节点挂掉的情况下可以及时告警出来。
这里截取了daemon框架中master主流程的部分核心代码,当中体现了master去zookeeper上“抢锁”,抢锁失败则停止worker进程,成功则启动worker进程以及监控worker进程执行的整个流程。master在daemon启动后就一直循环执行这个过程直到退出。
其中_zookeeper_lock_switch参数是为了控制daemon是否必须抢锁成功(即自己必须是序号最小的那个daemon)才运行的开关。毕竟并不是所有daemon都需要单机执行,比如rocketmq的消费者daemon一般来说同时运行得越多越好。但无论_zookeeper_lock_switch是开是关,master一开始都需要调用ZK_PROXY_INSTANCE->distributed_lock方法去和zookeeper通信,因为它不仅是为了获取锁,同时也是向zookeeper注册当前daemon实例,证明自己“存活”的不可或缺的步骤。
void CDaemonMaster::RunForever()
{
while (!g_master_need_quit)
{
sleep(1);
// 无论是否打开锁开关都要加锁,这是为了监控daemon存活与否
string zk_cur_dis_node;
int ret = ZK_PROXY_INSTANCE->distributed_lock(_zookeeper_lock_root_path + _process_name + "/",
zk_cur_dis_node);
if (ret == EN_ZKPROXY_LOCKED)
{
// 加锁成功,可以执行主任务
}
else if (ret == EN_ZKPROXY_LOCK_RETRY)
{
// 如果必须加锁成功才能执行,则未加锁成功关闭所有子进程
if (_zookeeper_lock_switch)
{
StopWorkers();
continue;
}
// 如果不要求加锁成功,则可以执行主任务
}
else
{
ERROR_LOG("distributed_lock failed, ret:%d, msg:%s",
ret, ZK_PROXY_INSTANCE->get_err_msg());
if (!_keep_working_when_zk_error)
{
// zk发生错误,杀掉所有子进程,继续下一轮
StopWorkers();
continue;
}
else
{
// zk出错仍然工作
ret = 0;
}
}
// fork进程,开始工作
if(_worker_pids.size() < _worker_num)
{
StartWorkers();
}
while(true)
{
int status = 0;
pid_t pid = waitpid(-1, &status, WNOHANG);
if(pid < 0)
{
if(errno != ECHILD)
{
ERROR_LOG("waitpid error, msg=%s", strerror(errno));
}
break;
}
else if(pid == 0)
{
//DEBUG_LOG("no process exit");
break;
}
else
{
Alarm("[%s] child process dead", _process_name.c_str());
ERROR_LOG("child pid=%d exit unexpected! status=%d, WIFEXITED(status)=%d, WEXITSTATUS(status)=%d, WIFSIGNALED(status)=%d, WTERMSIG(status)=%d",
pid,
status,
WIFEXITED(status),
WEXITSTATUS(status),
WIFSIGNALED(status),
WTERMSIG(status));
_worker_pids.erase(pid);
}
}
}
// 停止工作
StopWorkers();
// 释放锁
ZK_PROXY_INSTANCE->distributed_lock_clean();
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。