大家好,我是老李,今天我真的不讲段子了,我发誓!
上一节主要内容是分析Workerman进程管理相关内容的源码,今天是完结篇,主要内容是reload、fork还有monitor三部分内容。
今天,让我们还从runAll()函数作为入口点开始:
public static function runAll() {
static::checkSapiEnv();
static::init();
static::lock();
// 对start、stop、restart、reload动作的解析
// 这个函数我就不单独解析了,比较简单
static::parseCommand();
// 这个没什么好说的,就是daemon化
static::daemonize();
static::initWorkers();
// 安装信号
static::installSignal();
static::saveMasterPid();
static::unlock();
static::displayUI();
// fork出配置数量的Worker进程
static::forkWorkers();
static::resetStd();
// 监控Worker进程
static::monitorWorkers();
}
forkWorkers()顾名思义,下面是TA的具体代码实现,非常简单:
protected static function forkWorkers() {
// 这里逻辑就比较简单了,根据操作系统不同,分别走了两个
// 不同的fork方法
// windows我就不看了,我手里也没有windows
// Linux下的才是正道,所以下一步集中火力到
// forkWorkersForLinux()函数上!
if (static::$_OS === \OS_TYPE_LINUX) {
static::forkWorkersForLinux();
} else {
static::forkWorkersForWindows();
}
}
protected static function forkWorkersForLinux() {
// static::$_workers是什么,注意,我再次强调这个里并不是
// worker进程,而是指Worker实例,比如你在用WM的时候,同时
// 启动了两个Worker
// $http_worker_1 = new Worker("http://0.0.0.0:2222");
// $http_worker_2 = new Worker("http://0.0.0.0:1111");
// static::$_workers数组中存储的则是这两个http worker实例
foreach (static::$_workers as $worker) {
// 此处逻辑比较简单,worker实例有一个属性叫做name,就是
// 你可以给这个worker起一个名字
// 如果你不显式地给name属性赋值,默认是none
// 如果彻底为空,WM会把当前Worker实例d的socketname
// 赋值给name,比如http_worker_1实例的socketName就是
// http://0.0.0.0:2345
if (static::$_status === static::STATUS_STARTING) {
if (empty($worker->name)) {
$worker->name = $worker->getSocketName();
}
$worker_name_length = \strlen($worker->name);
if (static::$_maxWorkerNameLength < $worker_name_length) {
static::$_maxWorkerNameLength = $worker_name_length;
}
}
// 还记得Worker对象的count属性吗?此处按照count属性数量fork出
// 固定数量的worker进程,此处并没有用for循环来实现,而是使用while
// 循环。每当fork成功一个worker子进程,就会将pid保存到static::$_pidMap[$worker->workerId]
// 中去,当count发现当前Worker实例中子Worker进程数量少于count属性
// 时,就持续while
// 再次强调!⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️
// 切记!!切记!!Worker实例 不是 Worker子进程!!!
// $worker = new Worker("http://0.0.0.0:2222") 就是一个
// worker实例,该worker实例拥有一个Master进程,Master进程需要
// fork出count属性数目个的Worker子进程!!
//⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️
while (\count(static::$_pidMap[$worker->workerId]) < $worker->count) {
static::forkOneWorkerForLinux($worker);
}
}
}
// 所以,真正的pcntl_fork()在forkOneWorkerForLinux()方法里
protected static function forkOneWorkerForLinux(self $worker) {
// Get available worker id.
// 这个地方比较有意思,乍一看有点儿神经病
// 实际上你把这个getId函数看作是一个产生id的函数就行
// $id是表示的是:一个Worker实例中每一个子worker进程的
// id,比如有fork了4个子worker进程,那么这四个子worker进程
// 的$id就是0 1 2 3 ~ 明白了哈~~
$id = static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
// 这句没啥好说的吧???。。。前面基础章节w我可不少说
$pid = \pcntl_fork();
// For master process.
// $pid > 0分支,表示是父进程d的分支
// 等于0的分支表示是 子进程的分支
if ($pid > 0) {
// 父进程主要就是保存 $_pidMap 和 $_idMap 两个数组即可
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
} // For child processes.
// 可能有人不明白为什么要写成 0 == $pid
// 很简单,$pid == 0也没问题,但是,如果你
// 在写$pid == 0的时候,一不小心手贱少写了一个=
// 那就变成 $pid = 0,我可告诉你,这会儿任何编辑器
// 都不会报错,程序照常运行,但是至于逻辑对不对,呵呵哒~~~
elseif (0 === $pid) {
// ??????????
// ??????????
// 这两个沙雕函数是干啥的?也没个返回值...
// 注意:此处我没有验证,属于个人猜测,纯属猜测
// 众所周知,计算机中的随机数都是伪随机数,并不是
// 真正的随机。之前可能有过Worker多个子进程产生伪随机数
// 相同的情况,因为worker进程fork出来后应该是继承的父进程
// 的随机种子。所以,这里呢,先用srand将每个worker子进程
// 的种子先打乱一下,估计这样的话,后面在业务逻辑里写
// 获取伪随机数的代码应该就会得到不同的伪随机数了
// 瞎TM猜,如果不对了,你也没辙...
// 你们试下吧,好吧?
\srand();
\mt_srand();
// 这里是socket部分,如果端口复用,就直接listen
// 这里listen就是socket中的listen
// 这里部分我们到后面socket时候再详细说
if ($worker->reusePort) {
$worker->listen();
}
// resetStd()函数里,对各种输入、输出,该关闭的关闭
// 该重定向的重定向...
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
// Remove other listener.
// 这里是啥意思呢?
// 留到socket的时候再讲吧.
foreach(static::$_workers as $key => $one_worker) {
if ($one_worker->workerId !== $worker->workerId) {
$one_worker->unlisten();
unset(static::$_workers[$key]);
}
}
// 清除掉所有原来的定时器.
Timer::delAll();
// 给进程起一个名字,方便在ps -ef的时候辨别
// 还记得我去宇宙条那道口头面试题么?感受一下...
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
// run()函数中,就是 event-loop 了!
// 这里我们到socket的时候,再详细
$worker->run();
$err = new Exception('event-loop exited');
static::log($err);
exit(250);
} else {
throw new Exception("forkOneWorker fail");
}
}
所以fork这里的流程就是这样咯~我们应该学会主动忽略细枝末节或者当前对我们暂不重要的地方,这里有个方法论就是:
要知道当前阶段的主要矛盾,当前主要矛盾是进程,而不是socket
好了,当Master进程按照配置将子Worker进程fork出来后,Master进程需要对这一坨子进程进行监管了,也就是MoniterWorkers()函数咯~你们感受一下:
// 不解释了...
// 这要再解释了,我都感觉是在侮辱诸君了
protected static function monitorWorkers() {
if (static::$_OS === \OS_TYPE_LINUX) {
static::monitorWorkersForLinux();
} else {
static::monitorWorkersForWindows();
}
}
// 直接跳到monitorWorkersForLinux()~
protected static function monitorWorkersForLinux() {
// 首先将整个worker实例的状态设置为running ~
static::$_status = static::STATUS_RUNNING;
// 这里就比较好玩了,workerman是如何保证程序在后台持续运行
// 不退出的呢?
// 记住了,这里要划线,要考!
// Master主进程就靠下面这个可能令你惊讶的while 1来搞定
// Worker子进程就众所周知全靠event-loop来保证了
// 今天主要矛盾是 Master进程的while
// Master进程就靠while保证不退出并持续对Worker进程进行
while (1) {
// Calls signal handlers for pending signals.
// 这个,没啥好说的吧???
// 派发信号,说白了就是让信号监听函数生效
// 前面基础讲过
\pcntl_signal_dispatch();
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status = 0;
// 这个也没啥好说的吧
// 使用wait来回收子进程,避免僵尸进程
// 前面基础讲过...
$pid = \pcntl_wait($status, \WUNTRACED);
// Calls signal handlers for pending signals again.
// 再次派发信号,使用pcntl_signal_dispatch就是有
// 这么个缺陷,需要在上次调用生效后,立马在dispatch起来
// 不然只有第一次生效,后面统统不生效
// 前面基础章节讲过
\pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
// 下面这个foreach循环,别看代码一大坨
// 实际上,贼没意思
// 因为一个worker子进程已经退出了,所以
// 这里要把之前的$_pidMap等数组中信息中与刚死掉
// 的那个worker子进程相关的所有信息全部清理掉
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker = static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
++static::$_globalStatistics['worker_exit_info'][$worker_id][$status];
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
// Mark id is available.
$id = static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
break;
}
}
// Is still running state then fork a new worker process.
// 这里比较有意思。你要考虑到的是,worker进程退出可能并不因为是
// 停止服务,还有一种情况就是reload,又或者worker子进程响应过
// xxx次请求后自动销毁并拉起一个新的(不知道wm有没有这功能),
// 这些情况下除了要回收垃圾信息外,到最后还要再重新拉起一个新的
// worker子进程来补充进来。
// Workerman里如何判定是【停止服务】还是【热加载服务】呢?
// 就靠status状态属性咯~~~
if (static::$_status !== static::STATUS_SHUTDOWN) {
// 这里继续通过 forkWorker()函数来fork子进程
// 还记得吗?是通过while循环+判断count属性来实现的
static::forkWorkers();
// If reloading continue
// ... 这里有个reload,但是reload如果要运行,有一个条件
// 那就是当前退出的worker子进程必须要在 $_pidsToRestart
// 数组中
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
// 啊哈哈,重点在这里
// 每当一个子进程退出后,并且该子进程在$_pidsToRestart
// 数组里,那么Master进程就要执行一下reload,所以reload
// 里有什么?
static::reload();
}
}
}
// If shutdown state and all child processes exited then master process exit.
// 如果是真要退出服务了。。。清理所有垃圾数据,然后再退出
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
正如你所看到的这样,我单独把reload()拿出来了,因为reload的逻辑非常绕,我估计已经绕到弄不好亮哥自己都想不起来咋回事了...这里我们必须要从php index.php reload操作开始说起,不然真的就说不清楚了。
我们得先感受下输入php index.php reload后的整体是啥样的...我用灵魂手法做了一个不错的流程图,你们感受下:
能感受到么?感受不到就看代码吧...
protected static function reload() {
// For master process.
// 当 reload 的时候,master进程和worker进程的响应d动作应该是
// 不一样的,所以下面得分开处理对待
if (static::$_masterPid === \posix_getpid()) {
// Set reloading state.
// 下面这一坨别看多。。。实际上还好
// 主要目的就是 一、设置当前状态为reloading
// 二、通过call_user_func触发onWorkerReload回调函数
if (static::$_status !== static::STATUS_RELOADING && static::$_status !== static::STATUS_SHUTDOWN) {
static::log("Workerman[" . \basename(static::$_startFile) . "] reloading");
static::$_status = static::STATUS_RELOADING;
// Try to emit onMasterReload callback.
if (static::$onMasterReload) {
try {
// 这个技巧值得注意!
\call_user_func(static::$onMasterReload);
} catch (\Exception $e) {
static::log($e);
exit(250);
} catch (\Error $e) {
static::log($e);
exit(250);
}
static::initId();
}
}
// 这个。。。你们一定能看懂是
if (static::$_gracefulStop) {
$sig = \SIGQUIT;
} else {
$sig = \SIGUSR1;
}
// Send reload signal to all child processes.
$reloadable_pid_array = array();
// $_pidMap的结构我就不再赘述了
// 使用foreach遍历所有Worker实例
// 这里就是要把当前所有Worker实例的所有worker子进程pid全部
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
// 当前worker实例
$worker = static::$_workers[$worker_id];
// reloadable这个属性的意思是我有必要说下:
// 设置当前Worker实例是否可以reload,即收到reload信号后是否退出重启。
// 不设置默认为true,收到reload信号后自动重启进程
if ($worker->reloadable) {
foreach ($worker_pid_array as $pid) {
$reloadable_pid_array[$pid] = $pid;
}
} else {
foreach ($worker_pid_array as $pid) {
// Send reload signal to a worker process which reloadable is false.
\posix_kill($pid, $sig);
}
}
}
// Get all pids that are waiting reload.
// 这个地方非常非常有意思,因为这里这个数组将会成为结束
// 执行的条件
static::$_pidsToRestart = \array_intersect(static::$_pidsToRestart, $reloadable_pid_array);
// Reload complete.
// 看到没?如果说$_pidsToRestart已经是空了,reload方法就彻底
// 结束执行了
if (empty(static::$_pidsToRestart)) {
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::$_status = static::STATUS_RUNNING;
}
return;
}
// Continue reload.
// 这里也是关键,奇怪,为什么一个current方法,就能Continue reload.
// 呢??关键看下一行!
$one_worker_pid = \current(static::$_pidsToRestart);
// Send reload signal to a worker process.
// 啊哈,想子进程pid发送signal,这里的signal不是sigquit就是sigusr1
// 你去信号捕捉那里看下,当捕捉到sigquit就是sigusr1后
// 信号处理器里就会再次执行static::reload()!
// 说白了就是 static::reload()中Master代码段+signal-handler
// 共同组成了一个循环逻辑!
// 而结束这个循环的条件就是static::$_pidsToRestart数组w为空!
\posix_kill($one_worker_pid, $sig);
// If the process does not exit after static::KILL_WORKER_TIMER_TIME seconds try to kill it.
// !保证子进程一定被干挺了!
if(!static::$_gracefulStop){
Timer::add(static::KILL_WORKER_TIMER_TIME, '\posix_kill', array($one_worker_pid, \SIGKILL), false);
}
} // For child processes.
else {
\reset(static::$_workers);
$worker = \current(static::$_workers);
// Try to emit onWorkerReload callback.
if ($worker->onWorkerReload) {
try {
\call_user_func($worker->onWorkerReload, $worker);
} catch (\Exception $e) {
static::log($e);
exit(250);
} catch (\Error $e) {
static::log($e);
exit(250);
}
}
// 子进程执行realod()的时候,如果reloadable会true
// 就会exit自己,exit自己会让Master进程收到sigchld信号
// master进程中monitorWorkers()方法的pcntl_wait会开始执行
// 然后Master进程知道有子进程退出后,会做下善后工作,
// 具体上面 MoniterWorkers方法已经看过了
// 然后Master进程就会再fork个新的子进程出来顶替原来exit
// 掉的子进程!
if ($worker->reloadable) {
static::stopAll();
}
}
}
好了!reload()方法分析完毕!不出意外应该需要消化一阵儿了,我给大家的建议就是打开WM源码,打断点结合日志然后加上上面的东西,仔细跑几遍流程基本上就能绕过来了。
而我也如约憋住了自己:今天一个段子也没讲。最后送给大家一首诗吧,来自于《遥远的救世主》中男主的一首打油诗:
无题 --- 丁元英
本是后山人,偶做前堂客,醉舞经论半卷书,坐井说天阔! 大志戏功名,海斗量福祸,轮到囊中羞涩时,怒指乾坤错!
2019年12月20日 01时55分23秒