首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >重新尝试Python的ThreadPoolExecutor中失败的未来

重新尝试Python的ThreadPoolExecutor中失败的未来
EN

Stack Overflow用户
提问于 2022-08-06 11:35:42
回答 2查看 286关注 0票数 2

我想用Python的concurrent.futures.ThreadPoolExecutor实现重试逻辑。我想要下列属性:

一旦工作队列失败,就会将新的未来添加到工作队列中。重新尝试的未来可以被再次重试,可以无限期地或者最高重试计数。

我在网上发现的很多现有代码基本上都是在“回合”中运行的,他们在最初的期货列表中调用as_completed,重新提交失败的期货,将这些期货收集到一个新的列表中,然后返回到新列表上的as_completed (如果不是空的话)。基本上是这样的:

代码语言:javascript
代码运行次数:0
运行
复制
with concurrent.futures.ThreadPoolExecutor(...) as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        for fut in concurrent.futures.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        futures = new_futures

然而,我认为这并不能满足第一个属性,因为重新尝试的未来有可能在初始期货之前完成,但在所有初始期货完成之前,我们不会处理它。

这是一个假设的病理病例。假设我们有两个任务,第一个任务运行1秒,但失败的几率为90%,而第二个任务运行100秒。如果我们的执行者有两个工作人员,而第一个任务在1秒后失败,我们将立即重试它。但是,如果它再次失败,我们将无法重新尝试,直到第二个工作完成。

所以我的问题是,是否可以使用这些所需的属性实现重试逻辑,而无需使用外部库或重写低级执行器逻辑?我尝试过的一件事是将重试逻辑放在发送给工作人员的代码中:

代码语言:javascript
代码运行次数:0
运行
复制
def worker_job(fn):
    try:
        return fn()
    except Exception:
        executor.submit(fn)

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    executor.map(worker_job, jobs)

但似乎从一份工作中提交新工作是行不通的。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-08-09 08:46:56

使用as_completed重试

简单方式

循环使用wait(..., return_when=FIRST_COMPLETED)而不是as_completed(...)

权衡:

new_futures).

  • Troublesome期货的
  1. 开销(如果希望指定总体timeout.

,则重新添加服务员,构建pending

代码语言:javascript
代码运行次数:0
运行
复制
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
        for fut in done:
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        for fut in pending:
            job = futures[fut]
            new_futures[fut] = job
        futures = new_futures

有效途径

调整as_completed(...)以添加到fspending中,并使用waiter

交换:维护。

优点:如果需要,可以指定整个timeout

代码语言:javascript
代码运行次数:0
运行
复制
class AsCompletedWaiterWrapper:
    def __init__(self):
        self.fs = None
        self.pending = None
        self.waiter = None

    def listen(self, fut):
        with self.waiter.lock:
            self.fs.add(fut)
            self.pending.add(fut)
            fut._waiters.append(self.waiter)

    def as_completed(self, fs, timeout=None):
        """
        concurrent.futures.as_completed plus the 3 lines marked with +.
        """
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = set(fs)
        total_futures = len(fs)
        with _AcquireFutures(fs):
            finished = set(
                    f for f in fs
                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            pending = fs - finished
            waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
        self.fs = fs            # +
        self.pending = pending  # +
        self.waiter = waiter    # +
        finished = list(finished)
        try:
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs,))

            while pending:
                if timeout is None:
                    wait_timeout = None
                else:
                    wait_timeout = end_time - time.monotonic()
                    if wait_timeout < 0:
                        raise TimeoutError(
                                '%d (of %d) futures unfinished' % (
                                len(pending), total_futures))

                waiter.event.wait(wait_timeout)

                with waiter.lock:
                    finished = waiter.finished_futures
                    waiter.finished_futures = []
                    waiter.event.clear()

                # reverse to keep finishing order
                finished.reverse()
                yield from _yield_finished_futures(finished, waiter,
                                                   ref_collect=(fs, pending))

        finally:
            # Remove waiter from unfinished futures
            for f in fs:
                with f._condition:
                    f._waiters.remove(waiter)

用法:

代码语言:javascript
代码运行次数:0
运行
复制
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    w = AsCompletedWaiterWrapper()
    for fut in w.as_completed(futures):
        if fut.exception():
            job = futures[fut]
            new_fut = executor.submit(fn, job)
            futures[new_fut] = job
            w.listen(new_fut)
        else:
            ...  # logic to handle successful job

从工作助手处重试

等待eventswith ... executor:,因为ThreadPoolExecutor.__exit__关闭executor,所以它不能安排新的未来。

权衡:

如果要指定总体超时,则由于主process.

  • Troublesome中的executor引用,
  1. 将不与ProcessPoolExecutor一起工作。

代码语言:javascript
代码运行次数:0
运行
复制
def worker_job(fn, event):
    try:
        rv = fn()
        event.set()
        return rv
    except Exception:
        executor.submit(worker_job, fn, event)

with concurrent.futures.ThreadPoolExecutor() as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    events = [threading.Event() for _ in range(len(jobs))]
    executor.map(worker_job, jobs, events)
    for e in events:
        e.wait()
票数 2
EN

Stack Overflow用户

发布于 2022-08-09 03:36:19

你说过:

,但如果它再次失败,我们将无法重新尝试,直到第二个工作完成。

但我不认为那是真的。守则规定:

代码语言:javascript
代码运行次数:0
运行
复制
    `for fut in concurrent.futures.as_completed(futures):`

这在futures中有点微妙,它不是用作字典,而是用作迭代器,特别是用于检查是否完成的期货迭代器。这个迭代器提供了一组期货来检查是否完成,然后as_completed()在它们完成时生成它们。因此,在你的病理性案例中,1s的工作确实能够在100年代的工作完成之前被重试多次。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73259393

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档