我正在调用Python中的一个函数,我知道该函数可能会停止并迫使我重新启动脚本。
我如何调用这个函数,或者我用什么包装它,这样如果花费的时间超过5秒,脚本就会取消它并执行其他操作?
发布于 2012-12-11 21:41:31
我有一个不同的建议,它是一个纯函数(具有与线程建议相同的API ),并且似乎工作得很好(基于这个线程的建议)。
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
发布于 2016-02-02 04:02:33
在单元测试中搜索超时调用时,我遇到了这个线程。我在答案或第三方包中找不到任何简单的东西,所以我写了下面的装饰器,你可以直接进入代码:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后,让一个测试或任何你喜欢的函数超时就像下面这样简单:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
发布于 2015-02-15 20:43:32
在pypi上找到的stopit
包似乎很好地处理了超时。
我喜欢@stopit.threading_timeoutable
装饰器,它向装饰性函数添加了一个timeout
参数,这会如您所期望的那样停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
https://stackoverflow.com/questions/492519
复制相似问题