前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python backoff 实现轮询

Python backoff 实现轮询

作者头像
用户7685359
发布2020-08-21 19:50:44
1.6K0
发布2020-08-21 19:50:44
举报
文章被收录于专栏:FluentStudyFluentStudy

我们经常在开发中会遇到这样一种场景,即轮询操作。今天介绍一个Python库,用于更方便的达到轮询的效果——backoff。

backoff 模块简介及安装

这个模块主要提供了是一个装饰器,用于装饰函数,使得它在遇到某些条件时会重试(即反复执行被装饰的函数)。通常适用于我们在获取一些不可靠资源,比如会间歇性故障的资源等。

此外,装饰器支持正常的同步方法,也支持异步asyncio代码。

backoff 模块的安装也很简单,通过 pip 即可安装完成:

代码语言:javascript
复制
pip install backoff
backoff 用法及简单源码分析

backoff 提供两个主要的装饰器,通过 backoff. 调用,通过提示我们可以看到这两个装饰器,分别是:

代码语言:javascript
复制
backoff.on_predicate
backoff.on_exception

通过 github 查看 backoff 的源码,源码目录 backoff/_decorator.py,定义如下:

代码语言:javascript
复制
def on_predicate(wait_gen,
                 predicate=operator.not_,
                 max_tries=None,
                 max_time=None,
                 jitter=full_jitter,
                 on_success=None,
                 on_backoff=None,
                 on_giveup=None,
                 logger='backoff',
                 **wait_gen_kwargs):
    # 省略具体代码
    # 每个参数的定义在源码中都给出了明确的解释
    pass

def on_exception(wait_gen,
                 exception,
                 max_tries=None,
                 max_time=None,
                 jitter=full_jitter,
                 giveup=lambda e: False,
                 on_success=None,
                 on_backoff=None,
                 on_giveup=None,
                 logger='backoff',
                 **wait_gen_kwargs):
    # 省略具体代码
    # 每个参数的定义在源码中都给出了明确的解释
    pass

可以看到,定义了很多的参数,这些参数在源码中都给出了比较详细的解释,这里做简单的介绍:

首先,wait_gen:表示每次循环等待的时长,以秒为单位。它的类型是一个生成器,在 backoff 中内置了三个生成器。我们查看下源码,目录为 backoff/_wait_gen.py。我们取其中一个的详细实现来看下:

代码语言:javascript
复制
# 省略实现代码

# base * factor * n
def expo(base=2, factor=1, max_value=None):
    """Generator for exponential decay.
    Args:
        base: The mathematical base of the exponentiation operation
        factor: Factor to multiply the exponentation by.
        max_value: The maximum value to yield. Once the value in the
             true exponential sequence exceeds this, the value
             of max_value will forever after be yielded.
    """
    n = 0
    while True:
        a = factor * base ** n
        if max_value is None or a < max_value:
            yield a
            n += 1
        else:
            yield max_value

# 通过斐波那契数列控制
def fibo(max_value=None):
    pass

# 常量数值
def constant(interval=1):
    pass

从源码不难看出,通过一些策略,每次 yield 返回不同的数值,这些数值就是重试等待秒数。当然因为这个参数类型是生成器,显然我们也是可以自定义的。同时我们会发现每个 wait_gen 都是参数控制的,所以我们理应是可以修改这个参数的初始值的。

显然,wait_gen_kwargs就是用来传递这些参数的,它是通过可变关键字参数控制的,可以直接用 key=value 的形式进行传参,简单示例如下:

代码语言:javascript
复制
@backoff.on_predicate(backoff.constant, interval=5)
def main3():
    print("time is {} retry...".format(time.time()))

predictexception。这两个相对比较简单,predict 接受一个函数,当这个函数返回 True 时会进行重试,否则停止,同时这个函数接受一个参数,这个参数的值是被装饰函数的返回值。这个参数的默认值是:operator._not。这个函数的源码如下:

代码语言:javascript
复制
def not_(a):
    "Same as not a."
    return not a

所以默认返回的是 not 被装饰函数的返回值。如果当被装饰函数并没有返回值时,返回 True,会进行重试。

示例代码如下:

代码语言:javascript
复制
import backoff
import time

@backoff.on_predicate(backoff.fibo)
def test2():
    print("time is {}, retry...".format(time.time()))

if __name__ == "__main__":
    test2()

# 等价于:

# 必须接受一个参数,这个参数的值是被装饰函数的返回值
def condition(r):
    return True

@backoff.on_predicate(backoff.fibo, condition)
def test2():
    print("time is {}, retry...".format(time.time()))

if __name__ == "__main__":
    test2()

执行结果如下:

代码语言:javascript
复制
$ python3 backoff_test.py
time is 1571801845.834578, retry...
time is 1571801846.121314, retry...
time is 1571801846.229812, retry...
time is 1571801846.533237, retry...
time is 1571801849.460303, retry...
time is 1571801850.8974788, retry...
time is 1571801856.498335, retry...
time is 1571801861.56931, retry...
time is 1571801872.701226, retry...
time is 1571801879.198495, retry...
...

需要注意几点:

  • 如果自定义这个参数对应的函数,这个函数是需要接受一个参数的,这个参数的值是被装饰函数的返回值。我们可以通过控制这个返回值来做一些条件判断,当达到某些特殊条件时重试结束。
  • 示例中 wait_gen 用的是 backoff.fibo,注意观察输出的时间单隔,这里的时间间隔好像并不像我们想象中按 fibo 返回的时间间隔数,实际上如果想达到这个效果,我们需要将 jitter 参数设置为 None,后面介绍 jitter 参数时再做说明。

而 exception 则是接受异常类型的实例,可以是单个异常,也可以是元组形式的多个异常。简单示例如下:

代码语言:javascript
复制
import time
import random
import backoff
from collections import deque


class MyException(Exception):
    def __init__(self, message, status):
        super().__init__(message, status)
        self.message = message
        self.status = status

class MyException2(Exception):
    pass


@backoff.on_exception(backoff.expo, (MyException, MyException2))
def main():
    random_num = random.randint(0, 9)
    print("retry...and random num is {}".format(random_num))
    if random_num % 2 == 0:
        raise MyException("my exception", int("1000" + str(random_num)))
    raise MyException2()

max_triesmax_time 也比较简单,分别代表最大重试次数与最长重试时间。这里就不做演示了。

@backoff.on_exception 中的 giveup,它接受一个异常实例,通过对这个实例做一些条件判断,达到判断是否需要继续循环的目的。如果返回 True,则结束,反之继续。默认值一直是返回 False,即会一直循环。示例如下:

代码语言:javascript
复制
import random
import backoff

class MyException(Exception):
    def __init__(self, message, status):
        super().__init__(message, status)
        self.message = message
        self.status = status

def exception_status(e):
    print('exception status code is {}'.format(e.status))
    return e.status % 2 == 0


@backoff.on_exception(backoff.expo, MyException, giveup=exception_status)
def main():
    random_num = random.randint(0, 9)
    print("retry...and random num is {}".format(random_num))
    raise MyException("my exception", int("1000" + str(random_num)))


if __name__ == "__main__":
    main()

运行结果:

代码语言:javascript
复制
retry...and random num is 5
exception status code is 10005
retry...and random num is 0
exception status code is 10000
# 会再走一遍 raise 的代码,所以异常仍然会抛出来
Traceback (most recent call last):
  File "backoff_test.py", line 36, in <module>
    main()
  File "/Users/demon/code/python/exercise/.venv/lib/python3.7/site-packages/backoff/_sync.py", line 94, in retry
    ret = target(*args, **kwargs)
  File "backoff_test.py", line 32, in main
    raise MyException("my exception", int("1000" + str(random_num)))
__main__.MyException: ('my exception', 10000)

需要注意两点:

  • 这个参数接受的函数仍然只有一个参数,这个参数的值是一个异常实例对象
  • 从结果我们可以看出,当抛出异常时,会先进入 giveup 接受的函数,如果函数判断需要 giveup 时,当前的异常仍然会抛出。所以有需要,代码仍然需要做异常逻辑处理。

on_successon_backoffon_giveup 这三个是一类的参数,用于做事件处理:

  • on_sucess 事件会比较难理解一点,它表示的是被装饰函数成功结束轮询则会退出,对于 on_exception 来说即当被装饰函数没有发生异常时则会调用 on_success。而对于 on_predicate 来说即是通过 predicate 关键字返回为 False 结束循环则会调用。
  • on_backoff 即当程序产生循环时会调用
  • on_giveup 当程序是达到当前可尝试最大次数后,会调用。对于 on_predicate 如果是通过 max_tries 或者 max_time 会调用,而对于 on_exception ,对于 exception 参数返回 True 时也会调用 on_giveup

总结来说,max_tries 和 max_time 这种直接控制结束的,调用的是 on_giveup,而 exception 参数也是通过返回 True 则程序就结束,它是用来控制程序结束的,所以也会调用 on_giveup。而 predicate 参数返回 True 则程序继续,它是用来控制程序是否继续徨的,所以当它结束时,调用的是 on_success。

实验代码如下:

代码语言:javascript
复制
'''
@Author: Demon
@Date: 2019-10-22 15:30:32
@LastEditors: Demon
@LastEditTime: 2019-10-23 14:37:13
@Description: backoff
'''

import time
import random
import backoff

class MyException(Exception):
    def __init__(self, status, message):
        super().__init__(status, message)
        self.status = status
        self.message = message


def backoff_hdlr(details):
    print("Backing off {wait:0.1f} seconds afters {tries} tries "
          "calling function {target} with args {args} and kwargs "
          "{kwargs}".format(**details))


def success_hdlr(details):
    print("Success offafters {tries} tries "
          "calling function {target} with args {args} and kwargs "
          "{kwargs}".format(**details))


def giveup_hdlr(details):
    print("Giveup off {tries} tries "
          "calling function {target} with args {args} and kwargs "
          "{kwargs}".format(**details))


@backoff.on_predicate(
    backoff.constant,
    # 当 random num 不等 10009 则继续
    # 当 random_num 等于 10009 后,会调用 on_success
    lambda x: x != 10009,
    on_success=success_hdlr,
    on_backoff=backoff_hdlr,
    on_giveup=giveup_hdlr,
    max_time=2)
def main():
    num = random.randint(10000, 10010)
    print("time is {}, num is {}, retry...".format(time.time(), num))
    return num


@backoff.on_exception(
    backoff.constant,
    MyException,
    # 当 Exception 实例对象的 status 为 10009 成立时退出
    # 当条件成立时,调用的是 on_giveup
    giveup=lambda e: e.status == 10009,
    on_success=success_hdlr,
    on_backoff=backoff_hdlr,
    on_giveup=giveup_hdlr,
    )
def main2():
    num = random.randint(10000, 10010)
    print("time is {}, num is {}, retry...".format(time.time(), num))
    # 如果是通过这个条件成立退出,调用的是 on_success
    if num == 10010:
        return
    raise MyException(num, "hhh")


if __name__ == "__main__":
    #main()
    main2()

logger 参数,很显然就是用来控制日志输出的,这里不做详细介绍。copy 官方文档的一个示例:

代码语言:javascript
复制
my_logger = logging.getLogger('my_logger')
my_handler = logging.StreamHandler()
my_logger.add_handler(my_handler)
my_logger.setLevel(logging.ERROR)

@backoff.on_exception(backoff.expo,
                     requests.exception.RequestException,
                     logger=my_logger)
# ...

最后一个参数,jitter,开始也不是很明白这个参数的作用,文档的解释如下:

jitter: A function of the value yielded by wait_gen returning the actual time to wait. This distributes wait times stochastically in order to avoid timing collisions across concurrent clients. Wait times are jittered by default using the full_jitter function. Jittering may be disabled altogether by passing jitter=None.

有点晕,于是去看了下源码,明白了用法,截取关键源码如下:

代码语言:javascript
复制
# backoff/_decorator.py
def on_predicate(wait_gen,
                 predicate=operator.not_,
                 max_tries=None,
                 max_time=None,
                 jitter=full_jitter,
                 on_success=None,
                 on_backoff=None,
                 on_giveup=None,
                 logger='backoff',
                 **wait_gen_kwargs):
        pass  # 省略
        # 因为没有用到异步,所以会进到这里
        if retry is None:
            retry = _sync.retry_predicate

# backoff/_sync
# 分析可以看到有一句获取下次 wait 时长
seconds = _next_wait(wait, jitter, elapsed, max_time_)

# backoff/_common
def _next_wait(wait, jitter, elapsed, max_time):
    value = next(wait)
    try:
        if jitter is not None:
            seconds = jitter(value)
        else:
            seconds = value
    except TypeError:
        warnings.warn(
            "Nullary jitter function signature is deprecated. Use "
            "unary signature accepting a wait value in seconds and "
            "returning a jittered version of it.",
            DeprecationWarning,
            stacklevel=2,
        )

        seconds = value + jitter()

    # don't sleep longer than remaining alloted max_time
    if max_time is not None:
        seconds = min(seconds, max_time - elapsed)

    return seconds

看前面几行代码应该就会比较清晰了,如果 jitter 为 None,则会使用第一个参数返回的 value 值,而如果使用了,则会在这个 value 值上再做一次算法,默认为 full_jitter(value)。backoff/_jitter.py 提供了两个算法,代码不长,贴上来看看:

代码语言:javascript
复制
import random


def random_jitter(value):
    """Jitter the value a random number of milliseconds.
    This adds up to 1 second of additional time to the original value.
    Prior to backoff version 1.2 this was the default jitter behavior.
    Args:
        value: The unadulterated backoff value.
    """
    return value + random.random()


def full_jitter(value):
    """Jitter the value across the full range (0 to value).
    This corresponds to the "Full Jitter" algorithm specified in the
    AWS blog's post on the performance of various jitter algorithms.
    (http://www.awsarchitectureblog.com/2015/03/backoff.html)
    Args:
        value: The unadulterated backoff value.
    """
    return random.uniform(0, value)

到这里,backoff 的基本用法也就结束了。当然它也支持异步的方法装饰,用法差不多,这里不再深入。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FluentStudy 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • backoff 模块简介及安装
    • backoff 用法及简单源码分析
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档