首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >这次让我们真的读一下Workerman源码(七)

这次让我们真的读一下Workerman源码(七)

作者头像
老李秀
发布2019-12-25 14:44:48
1.2K0
发布2019-12-25 14:44:48
举报

大家好,我是老李,今天我真的不讲段子了,我发誓!

上一节主要内容是分析Workerman进程管理相关内容的源码,今天是完结篇,主要内容是reload、fork还有monitor三部分内容。

今天,让我们还从runAll()函数作为入口点开始:

public static function runAll() {    
        static::checkSapiEnv();
        static::init();
        static::lock();
        // 对start、stop、restart、reload动作的解析
        // 这个函数我就不单独解析了,比较简单
        static::parseCommand();
        // 这个没什么好说的,就是daemon化
        static::daemonize();
        static::initWorkers();
        // 安装信号
        static::installSignal();
        static::saveMasterPid();
        static::unlock();
        static::displayUI();
        // fork出配置数量的Worker进程
        static::forkWorkers();
        static::resetStd();
        // 监控Worker进程
        static::monitorWorkers();
}

forkWorkers()顾名思义,下面是TA的具体代码实现,非常简单:

protected static function forkWorkers() {
        // 这里逻辑就比较简单了,根据操作系统不同,分别走了两个
        // 不同的fork方法
        // windows我就不看了,我手里也没有windows
        // Linux下的才是正道,所以下一步集中火力到
        // forkWorkersForLinux()函数上!
        if (static::$_OS === \OS_TYPE_LINUX) {
            static::forkWorkersForLinux();
        } else {
            static::forkWorkersForWindows();
        }
}

protected static function forkWorkersForLinux() {
        // static::$_workers是什么,注意,我再次强调这个里并不是
        // worker进程,而是指Worker实例,比如你在用WM的时候,同时
        // 启动了两个Worker
        // $http_worker_1 = new Worker("http://0.0.0.0:2222");
        // $http_worker_2 = new Worker("http://0.0.0.0:1111");
        // static::$_workers数组中存储的则是这两个http worker实例
        foreach (static::$_workers as $worker) {
            // 此处逻辑比较简单,worker实例有一个属性叫做name,就是
            // 你可以给这个worker起一个名字
            // 如果你不显式地给name属性赋值,默认是none
            // 如果彻底为空,WM会把当前Worker实例d的socketname
            // 赋值给name,比如http_worker_1实例的socketName就是
            // http://0.0.0.0:2345
            if (static::$_status === static::STATUS_STARTING) {                
                if (empty($worker->name)) {
                    $worker->name = $worker->getSocketName();
                }
                $worker_name_length = \strlen($worker->name);
                if (static::$_maxWorkerNameLength < $worker_name_length) {
                    static::$_maxWorkerNameLength = $worker_name_length;
                }
            }
            // 还记得Worker对象的count属性吗?此处按照count属性数量fork出
            // 固定数量的worker进程,此处并没有用for循环来实现,而是使用while
            // 循环。每当fork成功一个worker子进程,就会将pid保存到static::$_pidMap[$worker->workerId]
            // 中去,当count发现当前Worker实例中子Worker进程数量少于count属性
            // 时,就持续while
            // 再次强调!⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️
            // 切记!!切记!!Worker实例 不是 Worker子进程!!!
            // $worker = new Worker("http://0.0.0.0:2222") 就是一个
            // worker实例,该worker实例拥有一个Master进程,Master进程需要
            // fork出count属性数目个的Worker子进程!!
            //⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️
            while (\count(static::$_pidMap[$worker->workerId]) < $worker->count) {
                static::forkOneWorkerForLinux($worker);
            }
        }
}

// 所以,真正的pcntl_fork()在forkOneWorkerForLinux()方法里
protected static function forkOneWorkerForLinux(self $worker) {
        // Get available worker id.
        // 这个地方比较有意思,乍一看有点儿神经病
        // 实际上你把这个getId函数看作是一个产生id的函数就行
        // $id是表示的是:一个Worker实例中每一个子worker进程的
        // id,比如有fork了4个子worker进程,那么这四个子worker进程
        // 的$id就是0 1 2 3 ~ 明白了哈~~
        $id = static::getId($worker->workerId, 0);
        if ($id === false) {
            return;
        }
        // 这句没啥好说的吧???。。。前面基础章节w我可不少说
        $pid = \pcntl_fork();
        // For master process.
        // $pid > 0分支,表示是父进程d的分支
        // 等于0的分支表示是 子进程的分支
        if ($pid > 0) {
            // 父进程主要就是保存 $_pidMap 和 $_idMap 两个数组即可
            static::$_pidMap[$worker->workerId][$pid] = $pid;
            static::$_idMap[$worker->workerId][$id]   = $pid;
        } // For child processes.
        // 可能有人不明白为什么要写成 0 == $pid 
        // 很简单,$pid == 0也没问题,但是,如果你
        // 在写$pid == 0的时候,一不小心手贱少写了一个=
        // 那就变成 $pid = 0,我可告诉你,这会儿任何编辑器
        // 都不会报错,程序照常运行,但是至于逻辑对不对,呵呵哒~~~
        elseif (0 === $pid) {
            // ??????????
            // ??????????
            // 这两个沙雕函数是干啥的?也没个返回值...
            // 注意:此处我没有验证,属于个人猜测,纯属猜测
            // 众所周知,计算机中的随机数都是伪随机数,并不是
            // 真正的随机。之前可能有过Worker多个子进程产生伪随机数
            // 相同的情况,因为worker进程fork出来后应该是继承的父进程
            // 的随机种子。所以,这里呢,先用srand将每个worker子进程
            // 的种子先打乱一下,估计这样的话,后面在业务逻辑里写
            // 获取伪随机数的代码应该就会得到不同的伪随机数了
            // 瞎TM猜,如果不对了,你也没辙...
            // 你们试下吧,好吧?
            \srand();
            \mt_srand();
            // 这里是socket部分,如果端口复用,就直接listen
            // 这里listen就是socket中的listen
            // 这里部分我们到后面socket时候再详细说
            if ($worker->reusePort) {
                $worker->listen();
            }
            // resetStd()函数里,对各种输入、输出,该关闭的关闭
            // 该重定向的重定向...
            if (static::$_status === static::STATUS_STARTING) {
                static::resetStd();
            }
            static::$_pidMap  = array();
            // Remove other listener. 
            // 这里是啥意思呢?
            // 留到socket的时候再讲吧.
            foreach(static::$_workers as $key => $one_worker) {
                if ($one_worker->workerId !== $worker->workerId) {
                    $one_worker->unlisten();
                    unset(static::$_workers[$key]);
                }
            }
            // 清除掉所有原来的定时器.
            Timer::delAll();
            // 给进程起一个名字,方便在ps -ef的时候辨别
            // 还记得我去宇宙条那道口头面试题么?感受一下...
            static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
            $worker->setUserAndGroup();
            $worker->id = $id;
            // run()函数中,就是 event-loop 了!
            // 这里我们到socket的时候,再详细
            $worker->run();
            $err = new Exception('event-loop exited');
            static::log($err);
            exit(250);
        } else {
            throw new Exception("forkOneWorker fail");
        }
}

所以fork这里的流程就是这样咯~我们应该学会主动忽略细枝末节或者当前对我们暂不重要的地方,这里有个方法论就是:

要知道当前阶段的主要矛盾,当前主要矛盾是进程,而不是socket

好了,当Master进程按照配置将子Worker进程fork出来后,Master进程需要对这一坨子进程进行监管了,也就是MoniterWorkers()函数咯~你们感受一下:

// 不解释了...
// 这要再解释了,我都感觉是在侮辱诸君了
protected static function monitorWorkers() {
        if (static::$_OS === \OS_TYPE_LINUX) {
            static::monitorWorkersForLinux();
        } else {
            static::monitorWorkersForWindows();
        }
}

// 直接跳到monitorWorkersForLinux()~
protected static function monitorWorkersForLinux() {
        // 首先将整个worker实例的状态设置为running ~ 
        static::$_status = static::STATUS_RUNNING;
        // 这里就比较好玩了,workerman是如何保证程序在后台持续运行
        // 不退出的呢?
        // 记住了,这里要划线,要考!
        // Master主进程就靠下面这个可能令你惊讶的while 1来搞定
        // Worker子进程就众所周知全靠event-loop来保证了
        // 今天主要矛盾是 Master进程的while
        // Master进程就靠while保证不退出并持续对Worker进程进行
        while (1) {
            // Calls signal handlers for pending signals.
            // 这个,没啥好说的吧???
            // 派发信号,说白了就是让信号监听函数生效
            // 前面基础讲过
            \pcntl_signal_dispatch();
            // Suspends execution of the current process until a child has exited, or until a signal is delivered
            $status = 0;
            // 这个也没啥好说的吧
            // 使用wait来回收子进程,避免僵尸进程
            // 前面基础讲过...
            $pid    = \pcntl_wait($status, \WUNTRACED);
            // Calls signal handlers for pending signals again.
            // 再次派发信号,使用pcntl_signal_dispatch就是有
            // 这么个缺陷,需要在上次调用生效后,立马在dispatch起来
            // 不然只有第一次生效,后面统统不生效
            // 前面基础章节讲过
            \pcntl_signal_dispatch();
            // If a child has already exited.
            if ($pid > 0) {
                // Find out witch worker process exited.
                // 下面这个foreach循环,别看代码一大坨
                // 实际上,贼没意思
                // 因为一个worker子进程已经退出了,所以
                // 这里要把之前的$_pidMap等数组中信息中与刚死掉
                // 的那个worker子进程相关的所有信息全部清理掉
                foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
                    if (isset($worker_pid_array[$pid])) {
                        $worker = static::$_workers[$worker_id];
                        // Exit status.
                        if ($status !== 0) {
                            static::log("worker[" . $worker->name . ":$pid] exit with status $status");
                        }

                        // For Statistics.
                        if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
                            static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
                        }
                        ++static::$_globalStatistics['worker_exit_info'][$worker_id][$status];

                        // Clear process data.
                        unset(static::$_pidMap[$worker_id][$pid]);
                        // Mark id is available.
                        $id                              = static::getId($worker_id, $pid);
                        static::$_idMap[$worker_id][$id] = 0;

                        break;
                    }
                }
                // Is still running state then fork a new worker process.
                // 这里比较有意思。你要考虑到的是,worker进程退出可能并不因为是
                // 停止服务,还有一种情况就是reload,又或者worker子进程响应过
                // xxx次请求后自动销毁并拉起一个新的(不知道wm有没有这功能),
                // 这些情况下除了要回收垃圾信息外,到最后还要再重新拉起一个新的
                // worker子进程来补充进来。
                // Workerman里如何判定是【停止服务】还是【热加载服务】呢?
                // 就靠status状态属性咯~~~
                if (static::$_status !== static::STATUS_SHUTDOWN) {
                    // 这里继续通过 forkWorker()函数来fork子进程
                    // 还记得吗?是通过while循环+判断count属性来实现的
                    static::forkWorkers();
                    // If reloading continue
                    // ... 这里有个reload,但是reload如果要运行,有一个条件
                    // 那就是当前退出的worker子进程必须要在 $_pidsToRestart 
                    // 数组中
                    if (isset(static::$_pidsToRestart[$pid])) {
                        unset(static::$_pidsToRestart[$pid]);
                        //  啊哈哈,重点在这里
                        // 每当一个子进程退出后,并且该子进程在$_pidsToRestart
                        // 数组里,那么Master进程就要执行一下reload,所以reload
                        // 里有什么?
                        static::reload();
                    }
                }
            }

            // If shutdown state and all child processes exited then master process exit.
            // 如果是真要退出服务了。。。清理所有垃圾数据,然后再退出
            if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
                static::exitAndClearAll();
            }
        }
}

正如你所看到的这样,我单独把reload()拿出来了,因为reload的逻辑非常绕,我估计已经绕到弄不好亮哥自己都想不起来咋回事了...这里我们必须要从php index.php reload操作开始说起,不然真的就说不清楚了。

我们得先感受下输入php index.php reload后的整体是啥样的...我用灵魂手法做了一个不错的流程图,你们感受下:

能感受到么?感受不到就看代码吧...

protected static function reload() {
        // For master process.
        // 当 reload 的时候,master进程和worker进程的响应d动作应该是
        // 不一样的,所以下面得分开处理对待
        if (static::$_masterPid === \posix_getpid()) {
            // Set reloading state.
            // 下面这一坨别看多。。。实际上还好
            // 主要目的就是 一、设置当前状态为reloading
            // 二、通过call_user_func触发onWorkerReload回调函数
            if (static::$_status !== static::STATUS_RELOADING && static::$_status !== static::STATUS_SHUTDOWN) {
                static::log("Workerman[" . \basename(static::$_startFile) . "] reloading");
                static::$_status = static::STATUS_RELOADING;
                // Try to emit onMasterReload callback.
                if (static::$onMasterReload) {
                    try {
                        // 这个技巧值得注意!
                        \call_user_func(static::$onMasterReload);
                    } catch (\Exception $e) {
                        static::log($e);
                        exit(250);
                    } catch (\Error $e) {
                        static::log($e);
                        exit(250);
                    }
                    static::initId();
                }
            }
            // 这个。。。你们一定能看懂是
            if (static::$_gracefulStop) {
                $sig = \SIGQUIT;
            } else {
                $sig = \SIGUSR1;
            }
            // Send reload signal to all child processes.
            $reloadable_pid_array = array();
            // $_pidMap的结构我就不再赘述了
            // 使用foreach遍历所有Worker实例
            // 这里就是要把当前所有Worker实例的所有worker子进程pid全部
            foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
                // 当前worker实例
                $worker = static::$_workers[$worker_id];
                // reloadable这个属性的意思是我有必要说下:
                // 设置当前Worker实例是否可以reload,即收到reload信号后是否退出重启。
                // 不设置默认为true,收到reload信号后自动重启进程
                if ($worker->reloadable) {
                    foreach ($worker_pid_array as $pid) {
                        $reloadable_pid_array[$pid] = $pid;
                    }
                } else {
                    foreach ($worker_pid_array as $pid) {
                        // Send reload signal to a worker process which reloadable is false.
                        \posix_kill($pid, $sig);
                    }
                }
            }

            // Get all pids that are waiting reload.
            // 这个地方非常非常有意思,因为这里这个数组将会成为结束
            // 执行的条件
            static::$_pidsToRestart = \array_intersect(static::$_pidsToRestart, $reloadable_pid_array);

            // Reload complete.
            // 看到没?如果说$_pidsToRestart已经是空了,reload方法就彻底
            // 结束执行了
            if (empty(static::$_pidsToRestart)) {
                if (static::$_status !== static::STATUS_SHUTDOWN) {
                    static::$_status = static::STATUS_RUNNING;
                }
                return;
            }
            // Continue reload.
            // 这里也是关键,奇怪,为什么一个current方法,就能Continue reload.
            // 呢??关键看下一行!
            $one_worker_pid = \current(static::$_pidsToRestart);
            // Send reload signal to a worker process.
            // 啊哈,想子进程pid发送signal,这里的signal不是sigquit就是sigusr1
            // 你去信号捕捉那里看下,当捕捉到sigquit就是sigusr1后
            // 信号处理器里就会再次执行static::reload()!
            // 说白了就是 static::reload()中Master代码段+signal-handler
            // 共同组成了一个循环逻辑!
            // 而结束这个循环的条件就是static::$_pidsToRestart数组w为空!
            \posix_kill($one_worker_pid, $sig);
            // If the process does not exit after static::KILL_WORKER_TIMER_TIME seconds try to kill it.
            // !保证子进程一定被干挺了!
            if(!static::$_gracefulStop){
                Timer::add(static::KILL_WORKER_TIMER_TIME, '\posix_kill', array($one_worker_pid, \SIGKILL), false);
            }
        } // For child processes.
        else {
            \reset(static::$_workers);
            $worker = \current(static::$_workers);
            // Try to emit onWorkerReload callback.
            if ($worker->onWorkerReload) {
                try {
                    \call_user_func($worker->onWorkerReload, $worker);
                } catch (\Exception $e) {
                    static::log($e);
                    exit(250);
                } catch (\Error $e) {
                    static::log($e);
                    exit(250);
                }
            }
            // 子进程执行realod()的时候,如果reloadable会true
            // 就会exit自己,exit自己会让Master进程收到sigchld信号
            // master进程中monitorWorkers()方法的pcntl_wait会开始执行
            // 然后Master进程知道有子进程退出后,会做下善后工作,
            // 具体上面 MoniterWorkers方法已经看过了
            // 然后Master进程就会再fork个新的子进程出来顶替原来exit
            // 掉的子进程!
            if ($worker->reloadable) {
                static::stopAll();
            }
       }
}

好了!reload()方法分析完毕!不出意外应该需要消化一阵儿了,我给大家的建议就是打开WM源码,打断点结合日志然后加上上面的东西,仔细跑几遍流程基本上就能绕过来了。

而我也如约憋住了自己:今天一个段子也没讲。最后送给大家一首诗吧,来自于《遥远的救世主》中男主的一首打油诗:

无题 --- 丁元英

本是后山人,偶做前堂客,醉舞经论半卷书,坐井说天阔! 大志戏功名,海斗量福祸,轮到囊中羞涩时,怒指乾坤错!

2019年12月20日 01时55分23秒

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能API社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档