前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python|玩转 Asyncio 任务处理(2)

Python|玩转 Asyncio 任务处理(2)

作者头像
数据科学工厂
发布2024-07-05 13:37:31
730
发布2024-07-05 13:37:31
举报

引言

Python 的 Asyncio 模块在处理 I/O 密集型任务时表现出色,并且在最近的 Python 版本迭代中获得了诸多增强。不过,由于处理异步任务的途径多样,选择在特定情境下最合适的方法可能会让人感到迷惑。在这篇文章[1]中,我会先从任务对象的基本概念讲起,接着探讨各种处理异步任务的方法,并分析它们各自的优势和劣势。

等待多个任务

现在,让我们来看看有趣的事情 - 等待多个任务!等待任务集合主要有三种方式;每种方法都有其优点和缺点,并且在不同的情况下会有所帮助。

asyncio.wait

我们的第一个选项类似于 wait_for 函数,但它是为一组任务或更为基础的 Future 对象设计的,这些对象可以是列表、元组或集合等形式。

代码语言:javascript
复制
asyncio.wait(collection_of_tasks, *, timeout=None, return_when=ALL_COMPLETED)

此函数返回一个由两个集合组成的元组:第一个集合包含已完成的任务,第二个集合则包含尚未完成的任务。如果在超时期限或 return_when 参数指定的条件满足之前任务已完成,它们将被归入已完成的任务集合;未完成的任务则被放入第二个集合,这个集合通常被称作 pending,或者如果你不打算使用这些任务,也可以简单地用下划线 _ 来表示。然而,与 'asyncio.wait' 函数不同的是,在超时发生时,未完成的任务不会被自动取消。

return_when 参数允许你指定 asyncio.wait 函数在以下三种情况之一发生时返回:

  • FIRST_COMPLETED 当第一个任务完成或被取消时返回结果。
  • FIRST_EXCEPTION 当任一任务引发异常,或所有任务都已完成时返回结果。
  • ALL_COMPLETED 是默认选项,它将在所有 futures 完成或被取消时返回结果。

让我们通过一个实际例子来演示这个过程:

代码语言:javascript
复制
import asyncio
import random

async def job():
    await asyncio.sleep(random.randint(1, 5))

async def main():
    tasks = [
        asyncio.create_task(job(), name=index)
        for index in range(1, 5)
    ]

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    print(f’The first task completed was {done.pop().get_name()}’)

asyncio.run(main())

Output:

代码语言:javascript
复制
The first task completed was 4

asyncio.gather

现在,让我们深入了解 asyncio.gather 函数,特别是带有参数 return_exceptions=False 的用法。

与 wait_for 函数仅接受任务或Futuer对象的集合不同,gather 函数可以接受任意数量的任务、Futuer对象,甚至是协程对象,作为一系列位置参数传递给它。传入 gather 的协程对象会自动转换为任务对象,以便它们能够在事件循环中执行。所有任务完成后,gather 会将所有通过 Task.result() 方法获得的返回值,作为一个列表返回。gather 一个非常贴心的特性是,返回的列表会按照任务传入的顺序排列。

gather 的另一个优点是,它是这三个函数中唯一能够优雅地处理并返回异常的。如果设置了 return_exceptions 参数为 True,那么在任务原本应该返回结果的位置,列表将包含由任务引发的异常。

下面,让我们通过一个实例来具体了解这一机制是如何运作的。

代码语言:javascript
复制
import asyncio
import random

async def job(id):
    print(f’Starting job {id}’)
    await asyncio.sleep(random.randint(1, 3))
    print(f’Finished job {id}’)
    return id

async def main():
    # create a list of worker tasks
    coros = [job(i) for i in range(4)]
    
    # gather the results of all worker tasks
    results = await asyncio.gather(*coros)
    
    # print the results
    print(f’Results: {results}’)

asyncio.run(main())

我们首先定义了一个包含多个协程对象的列表,接着通过 * 操作符将这些协程对象作为位置参数展开,供 gather 函数处理。当我们对 gather 函数返回的对象进行等待(即调用 await),它就会开始执行这些任务,并一直运行直至所有任务完成。值得注意的是,尽管由于 await asyncio.sleep(random.randint(1,3)) 的调用导致任务以随机顺序完成,但 gather 返回的结果列表依然保持了我们最初传入任务的顺序。

代码语言:javascript
复制
Starting job 0
Starting job 1
Starting job 2
Starting job 3
Finished job 3
Finished job 0
Finished job 1
Finished job 2
Results: [0, 1, 2, 3]

在下一个示例中,我将两个协程直接放入 Gather 中,并将 return_exceptions 设置为 True,这会在同一结果列表中优雅地返回异常:

代码语言:javascript
复制
import asyncio

async def task1():
    raise ValueError()

async def task2():
    raise KeyError()

async def main():
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    print(results)  # Will print [ValueError(), KeyError()]

asyncio.run(main())

asyncio.gather 的最后一个功能是,就像使用 Task.cancel() 取消单个任务一样,gather 返回的对象(然后等待)有自己的 cancel() 方法,该方法将循环遍历所有它正在管理的任务并取消所有这些任务。

asyncio.as_completed

这个函数与前面提到的两个有所不同;它不是一次性提供所有结果的集合或列表,而是提供了一个可迭代的对象,这样你可以在每个结果生成时即时处理它们。这个函数可以处理所有类型的可等待对象,包括协程、任务和未来对象。与其他许多方法类似,它也包含一个用于设置超时的关键字参数,如果到了设定的时间任务还没有完成,就会抛出 TimeoutError 异常。

代码语言:javascript
复制
asyncio.as_completed(aws, *, timeout=None)

以下是 as_completed 工作原理的示例:

代码语言:javascript
复制
import asyncio

async def my_task(id):
    return f’I am number {id}’


async def main():
    tasks = [my_task(id) for id in range(5)]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

我们可以看到输出是随机的,因为它只是打印出先完成的内容:

代码语言:javascript
复制
I am number 3
I am number 0
I am number 4
I am number 2
I am number 1

等待一组任务

在 Python 3.11 中,Asyncio 引入了一个新特性——asyncio.TaskGroup,它以上下文管理器的形式简化了对一组任务的管理。这个特性的一个关键优势在于,如果任务组中的某个任务遇到错误,其他所有任务都会自动取消,这有助于在异步编程中实现更加健壮的错误处理机制。

设想这样一个情形:你有两段代码,每段都负责调用不同的 API 接口。当这两个 API 接口的响应都收集齐后,你打算将这些数据统一存储到数据库中。但如果其中一个 API 调用失败,你希望整个数据插入操作都不要执行。这种情况下,使用 TaskGroup 就非常合适,因为它可以确保两个协程要么都完成,要么在其中一个失败时立即取消另一个。

你可以通过调用 tg.create_task() 方法来向任务组中添加任务。如果任务组中的任何一个任务失败,组内其他所有任务都将被取消。随后,异常会以 ExceptionGroup 或 BaseExceptionGroup 的形式传递到包含任务组的协程中。

以下是一个展示如何使用任务组的示例:

代码语言:javascript
复制
import asyncio

async def do_something():
    return 1

async def do_something_else():
    return 2

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(do_something())
        task2 = tg.create_task(do_something_else())

    print(f’Everything done: {task1.result()}, {task2.result()}’)

asyncio.run(main())

Output:

代码语言:javascript
复制
Everything done: 1, 2

总结

我们已经探讨了多种处理可等待对象(awaitables)的方法,现在来回顾一下:

  • await 是最基本的等待操作,你可以将它放在任何可等待对象前面来执行其内部的代码。但 await 不支持直接同时处理多个任务。
  • asyncio.wait_forawait 类似,用于处理单个可等待对象,但它允许设置超时,适用于长时间运行的任务。
  • asyncio.wait 接受一组任务或未来对象,并允许设置超时。你可以根据需求选择返回的时机,例如所有任务完成、第一个任务完成或遇到第一个异常。
  • asyncio.gather 接受多个可等待对象作为位置参数,并返回一个列表,列表中的顺序与传入的参数顺序相同。它还能处理那些抛出异常的任务。
  • asyncio.as_completed 提供了一个可迭代的方式,允许你逐个处理完成的任务,而不是一次性处理所有任务。它同样支持超时参数。
  • asyncio.TaskGroup 是 Python 3.11 新增的特性,它让你可以管理一组任务,并根据是否有任务抛出异常来决定是否全部或一个也不返回结果。

Reference

[1]

Source: https://jacobpadilla.com/articles/handling-asyncio-tasks

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 冷冻工厂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 等待多个任务
    • asyncio.wait
      • asyncio.gather
        • asyncio.as_completed
        • 等待一组任务
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档