作者:小小明
我们日常在使用的各种网络请求库时都带有timeout参数,例如request库。这个参数可以使请求超时就不再继续了,直接抛出超时错误,避免等太久。
如果我们自己开发的方法也希望增加这个功能,该如何做呢?
方法很多,但最简单直接的是使用并发库futures,为了使用方便,我将其封装成了一个装饰器,代码如下:
import functools
from concurrent import futures
executor = futures.ThreadPoolExecutor(1)
def timeout(seconds):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
future = executor.submit(func, *args, **kw)
return future.result(timeout=seconds)
return wrapper
return decorator
定义了以上函数,我们就有了一个超时结束的装饰器,下面可以测试一下:
import time
@timeout(1)
def task(a, b):
time.sleep(1.2)
return a+b
task(2, 3)
结果:
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
...
D:\Anaconda3\lib\concurrent\futures\_base.py in result(self, timeout)
432 return self.__get_result()
433 else:
--> 434 raise TimeoutError()
435
436 def exception(self, timeout=None):
TimeoutError:
上面我们通过装饰器定义了函数的超时时间为1秒,通过睡眠模拟函数执行超过1秒时,成功的抛出了超时异常。
程序能够在超时时间内完成时:
@timeout(1)
def task(a, b):
time.sleep(0.9)
return a+b
task(2, 3)
结果:
5
可以看到,顺利的得到了结果。
这样我们就可以通过一个装饰器给任何函数增加超时时间,这个函数在规定时间内还处理不完就可以直接结束任务。
前面我将这个装饰器将所需的变量定义到了外部,其实我们还可以通过类装饰器进一步封装,代码如下:
import functools
from concurrent import futures
class timeout:
__executor = futures.ThreadPoolExecutor(1)
def __init__(self, seconds):
self.seconds = seconds
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kw):
future = timeout.__executor.submit(func, *args, **kw)
return future.result(timeout=self.seconds)
return wrapper
经测试使用类装饰器能得到同样的效果。
注意:使用@functools.wraps的目的是因为被装饰的func函数元信息会被替换为wrapper函数的元信息,而@functools.wraps(func)将wrapper函数的元信息替换为func函数的元信息。最终虽然返回的是wrapper函数,元信息却依然是原有的func函数。 在函数式编程中,函数的返回值是函数对象被称为闭包。
如果我们需要记录部分函数的执行时间,函数执行前后打印一些日志,装饰器是一种很方便的选择。
代码如下:
import time
import functools
def log(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
res = func(*args, **kwargs)
end = time.perf_counter()
print(f'函数 {func.__name__} 耗时 {(end - start) * 1000} ms')
return res
return wrapper
装饰器 log 记录某个函数的运行时间,并返回其执行结果。
测试一下:
@log
def now():
print('2021-7-1')
now()
结果:
2021-7-1
函数 now 耗时 0.09933599994838005 ms
如果经常调用一个函数,而且参数经常会产生重复,如果把结果缓存起来,下次调用同样参数时就会节省处理时间。
定义函数:
import math
import random
import time
def task(x):
time.sleep(0.01)
return round(math.log(x**3 / 15), 4)
执行:
%%time
for i in range(500):
task(random.randrange(5, 10))
结果:
Wall time: 5.01 s
此时如果我们使用缓存的效果就会大不一样,实现缓存的装饰器有很多,我就不重复造轮子了,这里使用functools包下的LRU缓存:
from functools import lru_cache
@lru_cache()
def task(x):
time.sleep(0.01)
return round(math.log(x**3 / 15), 4)
执行:
%%time
for i in range(500):
task(random.randrange(5, 10))
结果:
Wall time: 50 ms
如果我们希望程序中的某个函数在整个程序的生命周期中只执行一次或N次,可以写一个这样的装饰器:
import functools
class allow_count:
def __init__(self, count):
self.count = count
self.i = 0
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kw):
if self.i >= self.count:
return
self.i += 1
return func(*args, **kw)
return wrapper
测试:
@allow_count(3)
def job(x):
x += 1
return x
for i in range(5):
print(job(i))
结果:
1
2
3
None
None