Gunicorn 'Green Unicorn' is a Python WSGI HTTP Server for UNIX. It's a pre-fork worker model. The Gunicorn server is broadly compatible with various web frameworks, simply implemented, light on server resources, and fairly speedy.
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn
的实现是由一个 master
进程来管理多个 worker
进程,所有的请求都是由 worker
进程处理的。
gunicorn
官方网站的例子如下:
$ pip install gunicorn
$ cat myapp.py
def app(environ, start_response):
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data])
$ gunicorn -w 4 myapp:app
[2014-09-10 10:22:28 +0000] [30869] [INFO] Listening at: http://127.0.0.1:8000 (30869)
[2014-09-10 10:22:28 +0000] [30869] [INFO] Using worker: sync
[2014-09-10 10:22:28 +0000] [30874] [INFO] Booting worker with pid: 30874
[2014-09-10 10:22:28 +0000] [30875] [INFO] Booting worker with pid: 30875
[2014-09-10 10:22:28 +0000] [30876] [INFO] Booting worker with pid: 30876
[2014-09-10 10:22:28 +0000] [30877] [INFO] Booting worker with pid: 30877
阅读源码第一步要先定位到入口,我们知道gunicorn的调用方式
gunicorn -w 4 myapp:app
写过python包的同学就知道怎么去定位入口,那就是在 setup.py
这个文件
setup(
...,
entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
gunicorn_paster=gunicorn.app.pasterapp:run
...
"""
)
也就是说入口在 gunicorn/app/wsgiapp.py
,我们直接定位到 run
这个函数上
def run():
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
可以看得出来其实是实例化 WSGIApplication
对象之后调用 run
方法。
我们可以先看看实例化 WSGIApplication
对象会做什么操作 由于 WSGIApplication
和其父类 Application
都没有实现 __init__
方法,我们直接看 Application
的父类 BaseApplication
的 __init__
方法。
class BaseApplication(object):
"""
An application interface for configuring and loading
the various necessities for any given web framework.
"""
def __init__(self, usage=None, prog=None):
...
self.do_load_config()
def do_load_config(self):
"""
Loads the configuration
"""
try:
self.load_default_config()
self.load_config()
except Exception as e:
...
def load_default_config(self):
# init configuration
self.cfg = Config(self.usage, prog=self.prog)
实例化过程的调用链看起来应该是这样的: __init__
-> do_load_config
-> load_default_config
& load_config
也就是实例化 WSGIApplication
对象会加载配置 self.cfg=Config()
我们再来看看 Config
对象是如何加载配置的
KNOWN_SETTINGS = []
...
def make_settings(ignore=None):
settings = {}
ignore = ignore or ()
for s in KNOWN_SETTINGS:
setting = s()
if setting.name in ignore:
continue
settings[setting.name] = setting.copy()
return settings
...
class Config(object):
def __init__(self, usage=None, prog=None):
self.settings = make_settings()
...
def __getattr__(self, name):
if name not in self.settings:
raise AttributeError("No configuration setting for: %s" % name)
return self.settings[name].get()
def __setattr__(self, name, value):
if name != "settings" and name in self.settings:
raise AttributeError("Invalid access!")
super(Config, self).__setattr__(name, value)
从上面的代码片段我们可以看出来其实实例化 Config
对象的时候会去访问 KNOWN_SETTINGS
这个列表的元素,但是从代码上看 KNOWN_SETTINGS
是个空列表,这边就有疑问了,什么时候会往 KNOWN_SETTINGS
这个列表上添加元素呢? 在这个文件全局搜了下 KNOWN_SETTINGS
,发现了一个有趣的技巧
class SettingMeta(type):
def __new__(cls, name, bases, attrs):
super_new = super(SettingMeta, cls).__new__
parents = [b for b in bases if isinstance(b, SettingMeta)]
if not parents:
return super_new(cls, name, bases, attrs)
attrs["order"] = len(KNOWN_SETTINGS)
attrs[ "validator"] = wrap_method(attrs["validator"])
new_class = super_new(cls, name, bases, attrs)
new_class.fmt_desc(attrs.get("desc", ""))
KNOWN_SETTINGS.append(new_class)
return new_class
...
class Setting(object):
...
Setting = SettingMeta('Setting', (Setting,), {})
...
class ConfigFile(Setting):
...
class Bind(Setting):
...
这边的代码片段上使用了python的元类, Setting
是由 SettingMeta
这个元类创建出来的类,继承 Setting
的子类都会被 SettingMeta
这个元类创建。 而创建类的时候,会把这些类放在 KNOWN_SETTINGS
列表中。 所以 make_settings
这个函数返回了除ignore之外的所有继承 Setting
的类的实例。而对 Config
对象实例的操作会被代理到对应的 setting
实例上。
下面回到 run
方法的实现上, WSGIApplication
没有实现 run
方法,重点还是看基类 BaseApplication
的 run
实现。
class BaseApplication(object):
...
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
class Application(BaseApplication):
...
def run(self):
...
super(Application, self).run()
Arbiter
这个类在gunicorn是相当重要,可以说 WSGIApplication
只是用来管理gunicorn的配置,而 Arbiter
是gunicorn中用来管理worker的。
The master process is a simple loop that listens for various process signals and reacts accordingly. It manages the list of running workers by listening for signals like TTIN, TTOU, and CHLD. TTIN and TTOU tell the master to increase or decrease the number of running workers. CHLD indicates that a child process has terminated, in this case the master process automatically restarts the failed worker.
master
进程用循环来监听信号事件并处理,通过监听信号事件来管理运行中 worker
的数目。
run
方法是 master
进程的 loop
所在。
class Arbiter(object):
...
def run(self):
"Main master loop."
self.start()
util._setproctitle("master [%s]" % self.proc_name)
try:
self.manage_workers()
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
...
我们来看看 start
都做了什么事
class Arbiter(object):
...
def start(self):
"""\
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info("Starting gunicorn %s", __version__)
if 'GUNICORN_PID' in os.environ:
self.master_pid = int(os.environ.get('GUNICORN_PID'))
self.proc_name = self.proc_name + ".2"
self.master_name = "Master.2"
self.pid = os.getpid()
if self.cfg.pidfile is not None:
pidname = self.cfg.pidfile
if self.master_pid != 0:
pidname += ".2"
self.pidfile = Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
fds = None
listen_fds = systemd.listen_fds()
if listen_fds:
self.systemd = True
fds = range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START + listen_fds)
elif self.master_pid:
fds = []
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)
self.cfg.when_ready(self)
self.init_signals
注册消息事件LISTENERS
class Arbiter(object):
...
def init_signals(self):
"""\
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
if self.PIPE:
[os.close(p) for p in self.PIPE]
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
[signal.signal(s, self.signal) for s in self.SIGNALS]
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
init_signals
会先关闭已存在的管道对 self.PIPE
,然后创建一个新的管道对,初始化管道并注册信号事件,除了 SIGCHLD
信号外,其他信号都会被 signal
方法处理,处理方式就是把信号加到信号事件队列,然后唤醒自身,当然前提是信号事件队列没有满的情况。一旦队列满了,就不对信号做任何处理。
LISTENERS
def _sock_type(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
sock_type = TCP6Socket
else:
sock_type = TCPSocket
elif isinstance(addr, string_types):
sock_type = UnixSocket
else:
raise TypeError("Unable to create socket from: %r" % addr)
return sock_type
def create_sockets(conf, log, fds=None):
"""
Create a new socket for the configured addresses or file descriptors.
If a configured address is a tuple then a TCP socket is created.
If it is a string, a Unix socket is created. Otherwise, a TypeError is
raised.
"""
listeners = []
# get it only once
laddr = conf.address
# check ssl config early to raise the error on startup
# only the certfile is needed since it can contains the keyfile
if conf.certfile and not os.path.exists(conf.certfile):
raise ValueError('certfile "%s" does not exist' % conf.certfile)
if conf.keyfile and not os.path.exists(conf.keyfile):
raise ValueError('keyfile "%s" does not exist' % conf.keyfile)
# sockets are already bound
if fds is not None:
for fd in fds:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
sock = None
for i in range(5):
try:
sock = sock_type(addr, conf, log)
except socket.error as e:
...
else:
break
if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)
listeners.append(sock)
return listeners
create_sockets
函数会通过配置的地址或文件描述符去创建 socket
,如果配置的地址是元组,则创建一个 tcp socket
,如果是字符串,则创建一个 unix socket
。 这些 sockets
最终将被 worker
消费,每次创建 worker
的时候都会把 sockets
当参数传递过去。
再回到 Arbiter
的 run
方法, start
之后调用了 manage_workers
方法。
class Arbiter(object):
...
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
active_worker_count = len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count = active_worker_count
self.log.debug("{0} workers".format(active_worker_count),
extra={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})
manage_workers
方法维护了大小为 num_workers
的worker数,worker进程是在 spawn_worker
方法中被创建的
class Arbiter(object):
...
def spawn_workers(self):
"""\
Spawn new workers as needed.
This is where a worker process leaves the main loop
of the master process.
"""
for i in range(self.num_workers - len(self.WORKERS.keys())):
self.spawn_worker()
time.sleep(0.1 * random.random())
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except ...
master进程会先实例化 worker_class
,默认的 worker_class
是 SyncWorker
。 可以在fork子进程之前预处理一些操作,具体可以在 gunicorn.config
的 Prefork
类实现。 fork之后会产生子进程,而父进程 master
把实例化的 worker
对象放到 self.WORKERS
中,这边的 pid
是子进程的进程ID。结下来父进程结束了 spawn_worker
,直接 return
worker.pid = pid
self.WORKERS[pid] = worker
return pid
而fork出来的子进程会继续执行 spawn_worker
的逻辑。主要的逻辑就是:
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
这边会产生疑问, sys.exit(0)
不是会退出子程序么?即使 SystemExit
异常被捕获但是也没有处理? 其实这个 worker
进程正常情况不会退出,原因就是在 worker.init_process()
中的实现。
class Worker(object):
...
def init_process(self):
...
self.run()
class SyncWorker(base.Worker):
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
...
def run_for_multiple(self, timeout):
while self.alive:
...
def run(self):
...
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
可以看得出来子类worker实现的 run_for_multiple
和 run_for_one
都会在循环中度过。
再次回到 Arbiter
的 run
方法,现在 run
方法进入了 loop
过程。
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
loop过程,每次从消息事件队列取一个消息处理,具体的消息处理会转交给 handle_<signame>
方法处理,如果没有信号要处理,就进入休眠状态直到被唤醒。这里就是 master
进程基本的工作。
master
进程进入休眠之后什么时候会被唤醒,怎么唤醒的? 我们来看看 master
进程休眠和唤醒的过程。
class Arbiter(object):
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
...
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except select.error as e:
if e.args[0] not in [errno.EAGAIN, errno.EINTR]:
raise
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
可以看得出来 Arbiter
的 sleep
方法会监视之前创建的管道读端 PIPE[0]
,一直等待到这一端有数据才结束。 wakeup
方法会在信号被加到信号事件队列之后调用,往管道写端 PIPE[1]
写数据。
这边我们重点来看看 Worker
中 init_process
的实现:
class Worker(object):
...
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
...
self.wait_fds = self.sockets + [self.PIPE[0]]
self.init_signals()
...
self.load_wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
run
class Worker(object):
...
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
注册消息事件的时候,worker进程会通过设置文件描述符( self.PIPE[1]
),当接收到信号的时候,一个'\0'字节被写入到指定的fd上(这里是管道的写端 self.PIPE[1]
),从而来唤醒一个 poll
或 select
调用,允许信号被处理。
run
run
方法由各个子类实现,我们来看看 SyncWorker
的 run
方法
class SyncWorker(base.Worker):
...
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except ...
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
def run_for_multiple(self, timeout):
while self.alive:
self.notify()
try:
ready = self.wait(timeout)
except StopWaiting:
return
if ready is not None:
for listener in ready:
if listener == self.PIPE[0]:
continue
try:
self.accept(listener)
except ...
if not self.is_parent_alive():
return
def run(self):
# if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
run_for_multiple
方法中调用 wait
方法
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except ...
wait
的调用会通过 select
来阻塞监听 wait_fds
列表, wait_fds
列表包括socket列表 self.sockets
和worker管道读端 self.PIPE[0]
,如果有可读的文件描述符,会返回这些可读的文件描述符,也就是说, worker
进程会在有 socket
请求和信号事件( signal.set_wakeup_fd
)触发唤醒。
可以看出了, run_for_one
或者 run_for_multiple
方法从 sockets
列表取一个或多个socket,调用 accept
方法建立连接,调用 handle
方法处理请求。这边的请求处理是阻塞式的,每次只能处理一个请求。
class SyncWorker(base.Worker):
...
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = six.next(parser)
self.handle_request(listener, req, client, addr)
except ...
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
respiter = self.wsgi(environ, resp.start_response)
except ...
handle
方法会解析请求的内容并调用 handle_request
方法来创建一个 wsgi
请求并被 wsgi
应用处理。最后如果处理的请求总数大于最大请求数,这个 worker
进程就结束。