supervisor源码解析

上篇文章介绍了supervisor的使用, 今天介绍一下supervisor的源码。 supervisor是python写的。如果你不懂python,我也介绍一下golang版的实现。 如果你不会,其他语言的实现,应该也有,github找一下。 自己读一下。实现原理应该都是一样的。

源码

supervisor的组件

1. supervisord

服务器主进程名为supervisord。它负责在自己的调用中启动子程序,响应来自客户端的命令,重新启动崩溃或退出的子进程,记录其子进程stdout和stderr 输出,以及生成和处理与子进程生命周期中的点相对应的“事件”。

服务器进程使用配置文件。这通常位于/etc/supervisord.conf中。此配置文件是“Windows-INI”样式配置文件。通过适当的文件系统权限保持此文件的安全非常重要,因为它可能包含未加密的用户名和密码。

2. supervisorctl

主进程的命令行客户端部分名为 supervisorctl。它为supervisord提供的功能提供了类似shell的界面。从 supervisorctl,用户可以连接到不同的 supervisord进程(一次一个),获取由子进程控制的状态,停止和启动子进程,并获取supervisord的运行进程列表。

命令行客户机通过UNIX域套接字或internet (TCP)套接字与服务器通信。服务器可以断言客户机的用户应该在执行命令之前提供身份验证凭据。客户机进程通常使用与服务器相同的配置文件,但是其中包含[supervisorctl]部分的任何配置文件都可以工作。

3. Web Server

如果您针对internet套接字启动了adminord,那么可以通过浏览器访问具有与supervise orctl类似功能的(稀疏的)web用户界面。在激活配置文件的[inet_http_server]部分之后,访问服务器URL(例如http://localhost:9001/),通过web界面查看和控制进程状态。

4. XML-RPC Interface

服务于web UI的同一个HTTP服务器提供一个XML-RPC接口,该接口可用于询问和控制管理器及其运行的程序。参见XML-RPC API文档。

整体看一下目录结构:

├── __init__.py
├── childutils.py
├── compat.py
├── confecho.py
├── datatypes.py
├── dispatchers.py
├── events.py
├── http.py
├── http_client.py
├── loggers.py
├── medusa
│   ├── CHANGES.txt
│   ├── LICENSE.txt
│   ├── README.txt
│   ├── TODO.txt
│   ├── __init__.py
│   ├── asynchat_25.py
│   ├── asyncore_25.py
│   ├── auth_handler.py
│   ├── counter.py
│   ├── default_handler.py
│   ├── docs
│   │   ├── README.html
│   │   ├── async_blurbs.txt
│   │   ├── composing_producers.gif
│   │   ├── data_flow.gif
│   │   ├── data_flow.html
│   │   ├── producers.gif
│   │   ├── programming.html
│   │   ├── proxy_notes.txt
│   │   ├── threads.txt
│   │   └── tkinter.txt
│   ├── filesys.py
│   ├── http_date.py
│   ├── http_server.py
│   ├── logger.py
│   ├── producers.py
│   ├── util.py
│   └── xmlrpc_handler.py
├── options.py
├── pidproxy.py
├── poller.py
├── process.py
├── rpcinterface.py
├── scripts
│   ├── loop_eventgen.py
│   ├── loop_listener.py
│   ├── sample_commevent.py
│   ├── sample_eventlistener.py
│   └── sample_exiting_eventlistener.py
├── skel
│   └── sample.conf
├── socket_manager.py
├── states.py
├── supervisorctl.py
├── supervisord.py
├── tests
│   ├── __init__.py
│   ├── base.py
│   ├── fixtures
│   │   ├── donothing.conf
│   │   ├── example
│   │   │   └── included.conf
│   │   ├── hello.sh
│   │   ├── include.conf
│   │   ├── issue-1054.conf
│   │   ├── issue-565.conf
│   │   ├── issue-638.conf
│   │   ├── issue-663.conf
│   │   ├── issue-664.conf
│   │   ├── issue-835.conf
│   │   ├── issue-836.conf
│   │   ├── listener.py
│   │   ├── spew.py
│   │   └── unkillable_spew.py
│   ├── test_childutils.py
│   ├── test_confecho.py
│   ├── test_datatypes.py
│   ├── test_dispatchers.py
│   ├── test_end_to_end.py
│   ├── test_events.py
│   ├── test_http.py
│   ├── test_http_client.py
│   ├── test_loggers.py
│   ├── test_options.py
│   ├── test_poller.py
│   ├── test_process.py
│   ├── test_rpcinterfaces.py
│   ├── test_socket_manager.py
│   ├── test_states.py
│   ├── test_supervisorctl.py
│   ├── test_supervisord.py
│   ├── test_web.py
│   └── test_xmlrpc.py
├── ui
│   ├── images
│   │   ├── button_refresh.gif
│   │   ├── button_restart.gif
│   │   ├── button_stop.gif
│   │   ├── icon.png
│   │   ├── rule.gif
│   │   ├── state0.gif
│   │   ├── state1.gif
│   │   ├── state2.gif
│   │   ├── state3.gif
│   │   └── supervisor.gif
│   ├── status.html
│   ├── stylesheets
│   │   └── supervisor.css
│   └── tail.html
├── version.txt
├── web.py
└── xmlrpc.py

整体架构

源码学习按上面的分块进行介绍

supervisord

先找到入口程序

# Main program
def main(args=None, test=False):
    assert os.name == "posix", "This code makes Unix-specific assumptions"
    # if we hup, restart by making a new Supervisor()
    first = True
    while 1:
        options = ServerOptions()
        options.realize(args, doc=__doc__)
        options.first = first
        options.test = test
        if options.profile_options:
            sort_order, callers = options.profile_options
            profile('go(options)', globals(), locals(), sort_order, callers)
        else:
            go(options)
        options.close_httpservers()
        options.close_logger()
        first = False
        if test or (options.mood < SupervisorStates.RESTARTING):
            break

if __name__ == "__main__": # pragma: no cover
    main()

从上面看出, main()中有个死循环一直只工作。 下面详细介绍,在循环中做了哪些工作?

# Main program
def main(args=None, test=False):
    assert os.name == "posix", "This code makes Unix-specific assumptions"
    # if we hup, restart by making a new Supervisor()
    first = True
    while 1:
        options = ServerOptions() // 配置
        options.realize(args, doc=__doc__)
        options.first = first
        options.test = test
        if options.profile_options:
            sort_order, callers = options.profile_options
            profile('go(options)', globals(), locals(), sort_order, callers)
        else:
            go(options) // 加载配置开始运行
        options.close_httpservers()
        options.close_logger()
        first = False
        if test or (options.mood < SupervisorStates.RESTARTING):
            break

def go(options): # pragma: no cover
    d = Supervisor(options) // 实例化一个Supervisor对象
    try:
    d.main()  // 运行main()函数
    except asyncore.ExitNow:
        pass

Supervisor类的代码

class Supervisor:
    stopping = False  # set after we detect that we are handling a stop request
    lastshutdownreport = 0  # throttle for delayed process error reports at stop
    process_groups = None  # map of process group name to process group object
    stop_groups = None  # list used for priority ordered shutdown

    def __init__(self, options): # 初始化
        self.options = options # 配置
        self.process_groups = {}
        self.ticks = {}

    def main(self):
        if not self.options.first:
            # prevent crash on libdispatch-based systems, at least for the
            # first request
            self.options.cleanup_fds()

        self.options.set_uid_or_exit()

        if self.options.first:
            self.options.set_rlimits_or_exit()

        # this sets the options.logger object
        # delay logger instantiation until after setuid
        self.options.make_logger()

        if not self.options.nocleanup:
            # clean up old automatic logs
            self.options.clear_autochildlogdir()

        self.run() # 运行

    def run(self):
        self.process_groups = {}  # clear
        self.stop_groups = None  # clear
        events.clear()
        try:
            # 根据配置进行添加process
            for config in self.options.process_group_configs:
                self.add_process_group(config)
            # 进程环境
            self.options.process_environment()
            # 打开http web
            self.options.openhttpservers(self)
            # 用于捕获信号
            self.options.setsignals()
            # 主进程是否成为守护进程
            if (not self.options.nodaemon) and self.options.first:
                self.options.daemonize()
            # writing pid file needs to come *after* daemonizing or pid
            # will be wrong
            self.options.write_pidfile()
            # 运行异步io服务器
            self.runforever()
        finally:
            # 异常退出,清理工作
            self.options.cleanup()

上面代码只有 self.runforever() 是工作的

    def runforever(self):
       # 事件通知机制
        events.notify(events.SupervisorRunningEvent())
        timeout = 1  # this cannot be fewer than the smallest TickEvent (5)
        # 获取已经注册的句柄
        socket_map = self.options.get_socket_map()
        
        # 这里会一直 运行,相当于守护进程
        while 1:
            # 保存运行信息等
            combined_map = {}
            combined_map.update(socket_map)
            combined_map.update(self.get_process_map())
            
            # 进程信息
            pgroups = list(self.process_groups.values())
            pgroups.sort()
            
            # 根据进程配置开启或关闭进程
            if self.options.mood < SupervisorStates.RUNNING:
                if not self.stopping:
                    # first time, set the stopping flag, do a
                    # notification and set stop_groups
                    self.stopping = True
                    self.stop_groups = pgroups[:]
                    events.notify(events.SupervisorStoppingEvent())

                self.ordered_stop_groups_phase_1()

                if not self.shutdown_report():
                    # if there are no unstopped processes (we're done
                    # killing everything), it's OK to shutdown or reload
                    raise asyncore.ExitNow
                    
            for fd, dispatcher in combined_map.items():
                if dispatcher.readable():
                    self.options.poller.register_readable(fd)
                if dispatcher.writable():
                    self.options.poller.register_writable(fd)
            # poll操作
            r, w = self.options.poller.poll(timeout)

            for fd in r:
                if fd in combined_map:
                    try:
                        dispatcher = combined_map[fd]
                        self.options.logger.blather(
                            'read event caused by %(dispatcher)r',
                            dispatcher=dispatcher)
                        dispatcher.handle_read_event()
                        if not dispatcher.readable():
                            self.options.poller.unregister_readable(fd)
                    except asyncore.ExitNow:
                        raise
                    except:
                        combined_map[fd].handle_error()
            # 依次遍历注册的文件句柄
            for fd in w:
                if fd in combined_map:
                    try:
                        dispatcher = combined_map[fd]
                        self.options.logger.blather(
                            'write event caused by %(dispatcher)r',
                            dispatcher=dispatcher)
                        dispatcher.handle_write_event()
                        if not dispatcher.writable():
                            self.options.poller.unregister_writable(fd)
                    except asyncore.ExitNow:
                        raise
                    except:
                        combined_map[fd].handle_error()

            for group in pgroups:
                group.transition()

            # 获取已经死亡的子进程信息
            self.reap()
            # 处理信号
            self.handle_signal()
            
            # tick时钟
            self.tick()

            if self.options.mood < SupervisorStates.RUNNING:
                self.ordered_stop_groups_phase_2()

            if self.options.test:
                break

下面详细介绍一下如何管理一个process: 下面图可以指导主要是

    def add_process_group(self, config):
        name = config.name
        if name not in self.process_groups:
            config.after_setuid()
            
            # 根据初始化后的配置文件生成相应的子进程实例
            self.process_groups[name] = config.make_group()
            # 添加事件通知
            events.notify(events.ProcessGroupAddedEvent(name))
            return True
        return False

在supervisor配置中,我们需要写上执行的文件以及执行环境env. 所以,我们围绕 执行这个程序的逻辑就可以了

    def execve(self, filename, argv, env):
        return os.execve(filename, argv, env)

作者将所有的process或event的配置 都绑定到 options的对象上, 包括执行的程序以及各种状态, 然后一个deamon程序一直在运行,去实时检查配置是否发生变化了。做对应的操作。

supervisorclt

这个客户端功能:

  1. 需要有一个UI,实现是cli
  2. 与server通信
def main(args=None, options=None):
    if options is None:
       # 实例化对象
        options = ClientOptions()

    options.realize(args, doc=__doc__)
    # 控制器
    c = Controller(options)

    if options.args:
        c.onecmd(" ".join(options.args))
        sys.exit(c.exitstatus)
    
    # 如果是交互模式
    if options.interactive:
        # 一直loop, 图形编程常见操作模式
        c.exec_cmdloop(args, options)
        sys.exit(0)  # exitstatus always 0 for interactive mode

我们看交互式的逻辑,exec_cmdloop

    # 控制器的method
    def exec_cmdloop(self, args, options):
        try:
            import readline
            delims = readline.get_completer_delims()
            delims = delims.replace(':', '')  # "group:process" as one word
            delims = delims.replace('*', '')  # "group:*" as one word
            delims = delims.replace('-', '')  # names with "-" as one word
            readline.set_completer_delims(delims)

            if options.history_file:
                try:
                    readline.read_history_file(options.history_file)
                except IOError:
                    pass

                def save():
                    try:
                        readline.write_history_file(options.history_file)
                    except IOError:
                        pass

                import atexit
                atexit.register(save)
        except ImportError:
            pass
        try:
            self.cmdqueue.append('status')
            # cmdloop
            self.cmdloop()
        except KeyboardInterrupt:
            self.output('')
            pass
       
    # cmdloop method定义  
    def cmdloop(self, intro=None):
        """Repeatedly issue a prompt, accept input, parse an initial prefix
        off the received input, and dispatch to action methods, passing them
        the remainder of the line as argument.

        """

        self.preloop()
        # 输入和快捷键
        if self.use_rawinput and self.completekey:
            try:
                import readline
                self.old_completer = readline.get_completer()
                readline.set_completer(self.complete)
                readline.parse_and_bind(self.completekey+": complete")
            except ImportError:
                pass
        try:
            if intro is not None:
                self.intro = intro
            if self.intro:
                self.stdout.write(str(self.intro)+"\n")
            stop = None
            while not stop:
                if self.cmdqueue:
                    line = self.cmdqueue.pop(0)
                else:
                    if self.use_rawinput:
                        try:
                            line = input(self.prompt)
                        except EOFError:
                            line = 'EOF'
                    else:
                        self.stdout.write(self.prompt)
                        self.stdout.flush()
                        line = self.stdin.readline()
                        if not len(line):
                            line = 'EOF'
                        else:
                            line = line.rstrip('\r\n')
                line = self.precmd(line)
                stop = self.onecmd(line)
                stop = self.postcmd(stop, line)
            self.postloop()
        finally:
            if self.use_rawinput and self.completekey:
                try:
                    import readline
                    readline.set_completer(self.old_completer)
                except ImportError:
                    pass

上面执行又会执行到onecmd

    def onecmd(self, line):
        """Interpret the argument as though it had been typed in response
        to the prompt.

        This may be overridden, but should not normally need to be;
        see the precmd() and postcmd() methods for useful execution hooks.
        The return value is a flag indicating whether interpretation of
        commands by the interpreter should stop.

        """
        cmd, arg, line = self.parseline(line)
        if not line:
            return self.emptyline()
        if cmd is None:
            return self.default(line)
        self.lastcmd = line
        if line == 'EOF' :
            self.lastcmd = ''
        if cmd == '':
            return self.default(line)
        else:
            try:
                func = getattr(self, 'do_' + cmd)
            except AttributeError:
                return self.default(line)
            return func(arg)

可以看出 会返回一个func = getattr(self, 'do_' + cmd),函数调用,是以do_开头的函数 在supervisorctl的源码文件中, 我们找一个示例

    def do_start(self, arg):
        if not self.ctl.upcheck():
            return

        names = arg.split()
        # 连接supervisor服务的代理
        supervisor = self.ctl.get_supervisor()

        if not names:
            self.ctl.output("Error: start requires a process name")
            self.ctl.exitstatus = LSBInitExitStatuses.INVALID_ARGS
            self.help_start()
            return
        
        # 开启所有
        if 'all' in names:
            # 调用startAllProcesses函数
            results = supervisor.startAllProcesses()
            for result in results:
                self.ctl.output(self._startresult(result))
                self.ctl.set_exitstatus_from_xmlrpc_fault(result['status'], xmlrpc.Faults.ALREADY_STARTED)
        else:
            # 逐个关闭
            for name in names:
                group_name, process_name = split_namespec(name)
                if process_name is None:
                    try:
                        results = supervisor.startProcessGroup(group_name)
                        for result in results:
                            self.ctl.output(self._startresult(result))
                            self.ctl.set_exitstatus_from_xmlrpc_fault(result['status'], xmlrpc.Faults.ALREADY_STARTED)
                    except xmlrpclib.Fault as e:
                        if e.faultCode == xmlrpc.Faults.BAD_NAME:
                            error = "%s: ERROR (no such group)" % group_name
                            self.ctl.output(error)
                            self.ctl.exitstatus = LSBInitExitStatuses.INVALID_ARGS
                        else:
                            self.ctl.exitstatus = LSBInitExitStatuses.GENERIC
                            raise
                else:
                    try:
                        result = supervisor.startProcess(name)
                    except xmlrpclib.Fault as e:
                        error = {'status': e.faultCode,
                                  'name': process_name,
                                  'group': group_name,
                                  'description': e.faultString}
                        self.ctl.output(self._startresult(error))
                        self.ctl.set_exitstatus_from_xmlrpc_fault(error['status'], xmlrpc.Faults.ALREADY_STARTED)
                    else:
                        name = make_namespec(group_name, process_name)
                        self.ctl.output('%s: started' % name)

上面比较重要的就是远程通过rpc调用

class SupervisorNamespaceRPCInterface:
    ....
    def startProcessGroup(self, name, wait=True):
        """ Start all processes in the group named 'name'

        @param string name     The group name
        @param boolean wait    Wait for each process to be fully started
        @return array result   An array of process status info structs
        """
        self._update('startProcessGroup')

        group = self.supervisord.process_groups.get(name)

        if group is None:
            raise RPCError(Faults.BAD_NAME, name)

        processes = list(group.processes.values())
        processes.sort()
        processes = [ (group, process) for process in processes ]

        startall = make_allfunc(processes, isNotRunning, self.startProcess,
                                wait=wait)

        startall.delay = 0.05
        startall.rpcinterface = self
        return startall # deferred

web UI

def make_http_servers(options, supervisord):
    from supervisor.web import supervisor_ui_handler
    uihandler = supervisor_ui_handler(supervisord)
    
class supervisor_ui_handler:
    IDENT = 'Supervisor Web UI HTTP Request Handler'

    def __init__(self, supervisord):
        self.supervisord = supervisord

    def match(self, request):
        if request.command not in ('POST', 'GET'):
            return False

        path, params, query, fragment = request.split_uri()

        while path.startswith('/'):
            path = path[1:]

        if not path:
            path = 'index.html'

        for viewname in VIEWS.keys():
            if viewname == path:
                return True

    def handle_request(self, request):
        if request.command == 'POST':
            request.collector = collector(self, request)
        else:
            self.continue_request('', request)

    def continue_request (self, data, request):
        form = {}
        cgi_env = request.cgi_environment()
        form.update(cgi_env)
        if 'QUERY_STRING' not in form:
            form['QUERY_STRING'] = ''

        query = form['QUERY_STRING']

        # we only handle x-www-form-urlencoded values from POSTs
        form_urlencoded = parse_qsl(data)
        query_data = parse_qs(query)

        for k, v in query_data.items():
            # ignore dupes
            form[k] = v[0]

        for k, v in form_urlencoded:
            # ignore dupes
            form[k] = v

        form['SERVER_URL'] = request.get_server_url()

        path = form['PATH_INFO']
        # strip off all leading slashes
        while path and path[0] == '/':
            path = path[1:]
        if not path:
            path = 'index.html'

        viewinfo = VIEWS.get(path)
        if viewinfo is None:
            # this should never happen if our match method works
            return

        response = {'headers': {}}

        viewclass = viewinfo['view']
        viewtemplate = viewinfo['template']
        context = ViewContext(template=viewtemplate,
                              request = request,
                              form = form,
                              response = response,
                              supervisord=self.supervisord)
        view = viewclass(context)
        pushproducer = request.channel.push_with_producer
        pushproducer(DeferredWebProducer(request, view))

RPC

class supervisor_xmlrpc_handler(xmlrpc_handler):
    path = '/RPC2'
    IDENT = 'Supervisor XML-RPC Handler'

    unmarshallers = {
        "int": lambda x: int(x.text),
        "i4": lambda x: int(x.text),
        "boolean": lambda x: x.text == "1",
        "string": lambda x: x.text or "",
        "double": lambda x: float(x.text),
        "dateTime.iso8601": lambda x: make_datetime(x.text),
        "array": lambda x: x[0].text,
        "data": lambda x: [v.text for v in x],
        "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
        "base64": lambda x: as_string(decodestring(as_bytes(x.text or ""))),
        "param": lambda x: x[0].text,
    }

    def __init__(self, supervisord, subinterfaces):
        self.rpcinterface = RootRPCInterface(subinterfaces)
        self.supervisord = supervisord

    def loads(self, data):
        params = method = None
        for action, elem in iterparse(StringIO(data)):
            unmarshall = self.unmarshallers.get(elem.tag)
            if unmarshall:
                data = unmarshall(elem)
                elem.clear()
                elem.text = data
            elif elem.tag == "value":
                try:
                    data = elem[0].text
                except IndexError:
                    data = elem.text or ""
                elem.clear()
                elem.text = data
            elif elem.tag == "methodName":
                method = elem.text
            elif elem.tag == "params":
                params = tuple([v.text for v in elem])
        return params, method

    def match(self, request):
        return request.uri.startswith(self.path)

    def continue_request(self, data, request):
        logger = self.supervisord.options.logger

        try:
            try:
                # on 2.x, the Expat parser doesn't like Unicode which actually
                # contains non-ASCII characters. It's a bit of a kludge to
                # do it conditionally here, but it's down to how underlying
                # libs behave
                if PY2:
                    data = data.encode('ascii', 'xmlcharrefreplace')
                params, method = self.loads(data)
            except:
                logger.error(
                    'XML-RPC request data %r is invalid: unmarshallable' %
                    (data,)
                )
                request.error(400)
                return

            # no <methodName> in the request or name is an empty string
            if not method:
                logger.error(
                    'XML-RPC request data %r is invalid: no method name' %
                    (data,)
                )
                request.error(400)
                return

            # we allow xml-rpc clients that do not send empty <params>
            # when there are no parameters for the method call
            if params is None:
                params = ()

            try:
                logger.trace('XML-RPC method called: %s()' % method)
                value = self.call(method, params)
                logger.trace('XML-RPC method %s() returned successfully' %
                             method)
            except RPCError as err:
                # turn RPCError reported by method into a Fault instance
                value = xmlrpclib.Fault(err.code, err.text)
                logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
                    method,
                    err.code, err.text))

            if isinstance(value, types.FunctionType):
                # returning a function from an RPC method implies that
                # this needs to be a deferred response (it needs to block).
                pushproducer = request.channel.push_with_producer
                pushproducer(DeferredXMLRPCResponse(request, value))

            else:
                # if we get anything but a function, it implies that this
                # response doesn't need to be deferred, we can service it
                # right away.
                body = as_bytes(xmlrpc_marshal(value))
                request['Content-Type'] = 'text/xml'
                request['Content-Length'] = len(body)
                request.push(body)
                request.done()

        except:
            tb = traceback.format_exc()
            logger.critical(
                "Handling XML-RPC request with data %r raised an unexpected "
                "exception: %s" % (data, tb)
            )
            # internal error, report as HTTP server error
            request.error(500)

    def call(self, method, params):
        return traverse(self.rpcinterface, method, params)

总结

python是解释型语言 弱类型,看源码没有编译型语言清晰明了。这个源码不太容易看,我参考supervisor的设计写公司项目,一开始是看这个源码,发现很不容易理清楚,我是先看了go语言版的,然后在回头看python版的才看明白。我后面整理go版本的,那个更加清晰。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券