前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

作者头像
饶文津
发布2021-11-04 10:36:10
9340
发布2021-11-04 10:36:10
举报

最近一个 python 项目中同时用到了 gevent 和 multiprocessing。在优雅退出的实现上,出现了一些预料之外的问题。

一个简化版的代码,启动了4 个进程,每个进程里启动了两个协程,并注册了 SIGINT 等信号的回调函数来实现优雅退出:

import signal
import time
import multiprocessing
import gevent
from gevent import monkey
monkey.patch_all()  # NOQA


class WorkerManager():
    def __init__(self):
        self.is_running = multiprocessing.Value('b', True)

    def job(self):
        while self.is_running.value:
            print("job")
            time.sleep(3)

    def run(self):
        for sig in [signal.SIGINT, signal.SIGUSR1, signal.SIGTERM]:
            signal.signal(sig, signal.SIG_IGN)

        jobs = [gevent.spawn(self.job) for _ in range(2)]
        gevent.joinall(jobs)

    def start(self):
        self.workers = [multiprocessing.Process(
            target=self.run) for _ in range(4)]
        for worker in self.workers:
            worker.start()
        signal.signal(signal.SIGINT, self.graceful_exit)

    def graceful_exit(self, sig, frame):
        self.shutdown()
 
    def shutdown(self):
        if not self.is_running.value:
            return
        self.is_running.value = False
        for worker in self.workers:
            worker.join()
        

worker_manager = WorkerManager()
worker_manager.start()

上面的代码运行后,按ctrl+c会报下面的错误:

gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

相关的调用栈

  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File ".../venv/lib/python3.7/site-packages/gevent/os.py", line 380, in waitpid
    get_hub().wait(new_watcher)
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
  File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out

背景知识

  • 信号处理的原理是操作系统会把信号发给进程和该进程的子进程,每个进程原来的逻辑就中断了,然后调用我们注册的信号回调函数来处理。如果这个进程里有多个线程,那么此刻跑的是哪个线程,就从哪个线程中断。
  • gevent 是一个流行的 python 网络库,主要的功能就是在 python 中提供了一些事件循环的接口。它是基于 greenlet 实现的。greenlet 也可以理解为协程,就像 golang 里的 goroutine。
  • greenlet 的功能就是提供了在不同调用栈之间切换(switch)的能力。比如一会执行这个协程,然后它要阻塞等待一些 IO 操作,那就主动切换到另一个协程的调用栈去执行另一个协程。而 gevent 就对 greenlet 进行了一层封装,我们只用调用 gevent.spawn() 就可以创建并运行协程,gevent 会帮我们调度。gevent 还封装了一些操作系统自带的函数,比如 sleep。
  • 每个 greenlet 都会在一个线程上,一个线程上可以有多个 greenlet,但一次只有一个 greenlet 在运行。
  • 对于每个协程,都需要在一个 hub 里运行,hub 被翻译为集线器,hub 也是一个 greenlet,为什么又要搞个 greenlet 呢,因为它是帮我们做切换调用栈的家伙。
  • hub 里运行着事件循环(loop),什么是事件循环呢?就是说操作系统会发出事件通知你的程序,比如一个 socket 可以读了,你的程序就可以做相应处理。这种注册事件、等待着并在事件发生时做处理的流程就是事件循环。gevent 是基于 libev 这个库实现事件循环的。
  • 当我们调用 spawn 时,会创建一个新的 greenlet,并在 hub 里注册事件,事件循环收到事件通知时,就会调用我们的回调函数。而如果回调函数里有一个 sleep 之类的阻塞事件,gevent 的实现中就会进行 switch 操作,也就是切到 hub,等阻塞操作完成,就又会从 hub 里切换回来。
  • 调用 join 或 joinall 时,就会切换到 hub 里,会启动事件轮询来等待协程结束。

原因

回到我们的代码里,我们用了gevent 的 monkey.patch_all(),并且用到了 multiprocessing,而出错的调用栈中可以看到问题出在对子进程 join 时,这个 join 函数在 multiprocess 库里,调用了 os.waitpid,这里就会调用 gevent 实现的 os,由于是个阻塞操作,就会在 switch_out 时出错。为什么出错呢?这是 gevent 里相关的代码:

class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
    # Subclasses must define:
    # - self.loop

    # This class defines loop in its .pxd for Cython. This lets us avoid
    # circular dependencies with the hub.

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
        if switch_out is not None:
            switch_out()
        return _greenlet_switch(self) # pylint:disable=undefined-variable

    def switch_out(self):
        raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')

因为我们的程序收到信号中断时,主进程里没有其他的 greenlet,主进程里也没有其它运行的东西,所以运行着的是 hub 本身这个 greenlet,它会在一个线程里运行。所以 switch_out 时会找之前在跑的 greenlet(getcurrent()这个代码),结果就是 hub 本身。

一般 switch_out 是用来从一个普通的 greenlet 切换到 hub 里的,现在从 hub 里无法再切换到其它地方了。所以就是‘BlockingSwitchOutError’ 错误了。

参考:

解决方法

  1. 既然是 hub 里无法切出去,那我们可以把 shutdown 放到一个 greenlet 里:
    def graceful_exit(self, sig, frame):
        gevent.spawn(self.shutdown)

但如果主进程了没别的在跑,可能不会等 shutdown 运行完。

ps:一不小心写成了 self.shutdown(),后面加上了(),就和没改一样,所以报了一样的错了。

  1. 也可以不让 gevent 影响 multiprocess 里的 os 函数
monkey.patch_all(os=False)
  1. 还可以使用 gevent 提供的 signal 处理函数,它会在一个新的 greenlet 里运行。注意 monkey patch 不会修补默认的 signal.signal 函数。
gevent.signal_handler(signal.SIGINT, self.graceful_exit, signal.SIGINT, None)

这种方法同样的可能不会等 shutdown 运行完。。

  1. 让代码最后加上这段也可以。因为这样,主进程就忙着跑 while True 了,而没有切到 hub 所在的线程。 但是占 cpu 资源。
while True:
    pass
  1. 可以将 join 的代码拿出来,在主线程里调用,不要放到回调函数里,不然会被 hub 线程运行。
    def join(self):
        for worker in self.worker_process:
            worker.join()
            worker.close()

    def shutdown(self):
        if not self.is_running.value:
            return
        self.is_running.value = False
...

worker_manager.join()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-11-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景知识
  • 原因
    • 参考:
    • 解决方法
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档