前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过 asyncio 实现基于协程的并发编程

通过 asyncio 实现基于协程的并发编程

作者头像
用户3147702
发布2022-06-27 13:38:31
5680
发布2022-06-27 13:38:31
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

此前的文章中,我们详细介绍了 python 中的协程。 python 的协程

协程是在用户进程中,按照用户预先设定的执行流程进行上下文切换,从而在开销远小于多线程/多进程并发的条件下实现程序的并发执行。 asyncio,tornado 和 gevent 在 python 原有协程机制的基础上封装了更为易用的高层次 api,本文我们就来详细介绍 asyncio 包基于协程实现的异步 IO。

2. asyncio 的构成

asyncio 实现了一个协程库,他具有下面几个组件:

2.1. 事件循环 — event_loop

协程是在用户进程中进行上下文切换实现的,与多线程/多进程并发执行的本质区别是没有操作系统来执行调度。 在 asyncio 中,事件循环就充当了操作系统的角色,负责调度在事件循环上注册的协程函数。

2.2. 协程 — coroutine

协程对象是通过 async 关键字定义的函数,他需要被注册到事件循环上,在事件循环执行过程中进行调用。

2.3. 任务 — task

一个协程对象就是一个原生可以挂起的函数。 任务时对协程的进一步封装,其中记录了任务的状态等信息。

2.4. future

我们此前已经介绍过 future 对象,他是一个用来表示将来会被执行的任务的对象。 正如我们之前提到,python 标准库中,在两个包中封装了 Future 类:

  1. concurrent
  2. asyncio

在两个包中封装的 Future 类本质上和用法上都是非常接近的。

2.5. async/await 关键字

async 关键字用于定义一个协程方法。 await 关键字则用于挂起阻塞的异步调用接口。 他们都是 python3.5 引入的关键字。

2.6. 可等待对象

可以被加入事件循环的对象就是可等待对象,分为三种类型:

  1. async 关键字标识的协程对象
  2. Task 对象
  3. Future 对象

3. 通过事件循环运行协程

代码语言:javascript
复制
import timeimport asyncio

async def do_some_work(x):
    print('Waiting: ', x)if __name__ == '__main__':
    now = lambda: time.time()

    start = now()
    coroutine = do_some_work(2)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)

    print('TIME: ', now() - start)

上面的例子中,我们首先通过 asyncio 的 get_event_loop 方法创建了事件循环,然后将被 async 关键字标记的协程方法注册到事件循环中,事件循环负责调起该方法。 方法顺利执行,打印出了:

Waiting: 2 TIME: 0.002991914749145508

3.1. python3.7 的优化

创建事件循环看上去非常繁琐,python3.7 引入了 asyncio.run 方法,让你可以省去这个操作:

代码语言:javascript
复制
import timeimport asyncio

async def do_some_work(x):
    print('Waiting: ', x)if __name__ == '__main__':
    now = lambda: time.time()

    start = now()
    asyncio.run(do_some_work(2))

    print('TIME: ', now() - start)

由于该方法目前属于“暂定基准状态”,所以本文仍然使用上述事件循环方式启动和运行协程。

4. 创建任务

asyncio 中 Task 对象是 Future 对象的子类。 上面的例子中,事件循环的 run_until_complete 方法实际上是将我们的协程方法封装成了 Task 对象并运行。 我们也可以显式手动创建 Task 对象,这样最大的好处在于我们可以对协程方法进行更为灵活的控制,例如监控任务执行的状态等。

4.1. 任务的状态

Task 和 Future 对象一样,拥有四种执行状态:

  1. Pending — 等待执行
  2. Running — 执行中
  3. Done — 完成执行
  4. Canceled — 已被取消

4.2. 创建 Task 对象

我们有三种方法创建一个 task 对象,下文中,变量 coroutine 表示我们的协程方法对象:

  1. 通过事件循环对象 — loop.create_task(coroutine)
  2. asyncio.ensure_future — asyncio.ensure_future(coroutine)
  3. asyncio.create_task — 仅限 python3.7 及以上,asyncio.ensure_future(coroutine)

4.2.1. 示例

代码语言:javascript
复制
import asyncioimport loggingimport time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    logging.info(what)    return whatif __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
    print(f"started at {time.strftime('%X')}")
    loop = asyncio.get_event_loop()
    task = loop.create_task(say_after(2, 'world'))
    logging.info(task)
    loop.run_until_complete(task)
    logging.info(task)
    print(f"finished at {time.strftime('%X')}")

打印出了:

started at 20:55:11 2019-07-11 20:55:11,769 - INFO: <Task pending coro=<say_after() running at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6>> 2019-07-11 20:55:13,771 - INFO: world 2019-07-11 20:55:13,771 - INFO: <Task finished coro=<say_after() done, defined at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6> result=’world’> finished at 20:55:13

4.3. 任务的取消 — cancel

cancel()

Task 对象具有 cancel 方法,允许我们取消一个已经提交到事件循环,但尚未完成的任务。 该 Task 对象的协程函数会抛出 CancelledError 异常。 如果在协程中捕获 CancelledError 异常,取消将会被抑制,但这是不推荐的做法。

4.4. 屏蔽取消 — asyncio.shield

更为推荐的方法是 asyncio.shield 方法:

asyncio.shield(arg, *, loop=None)

arg 是一个协程方法。 这个方法用于设置该对象屏蔽 cancel 方法:

代码语言:javascript
复制
res = await something() # 未屏蔽 cancelres = await shield(something()) # 屏蔽 cancel

4.5. Task 结果的获取 — result 方法与回调

4.5.1. result 方法

Task 作为 Future 的子类,也同样具有 Future 的 result 方法,实现阻塞等待并获取返回。

代码语言:javascript
复制
import asyncioimport loggingimport time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    logging.info(what)    return whatif __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
    print(f"started at {time.strftime('%X')}")
    loop = asyncio.get_event_loop()
    task = loop.create_task(say_after(2, 'world'))
    loop.run_until_complete(task)
    print(f"finished by {task.result()} at {time.strftime('%X')}")

打印出了:

started at 21:56:12 2019-07-11 21:56:14,720 - INFO: world finished by world at 21:56:14

4.5.2. 绑定回调 — add_done_callback

通过 add_done_callback 方法,我们可以将一个回调方法绑定到 Task 对象上,一旦任务完成运行,会自动以一个 Future 对象为参数调用预设的 callback 方法。

代码语言:javascript
复制
import asyncioimport loggingimport time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    logging.info(what)    return whatdef callback(future):
    logging.info(f"task callback for {future.result()}")if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
    print(f"started at {time.strftime('%X')}")
    loop = asyncio.get_event_loop()
    task = loop.create_task(say_after(2, 'world'))
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    print(f"finished at {time.strftime('%X')}")

打印出了:

started at 21:58:42 2019-07-11 21:58:44,955 - INFO: world finished at 21:58:44 2019-07-11 21:58:44,955 - INFO: task callback for world

4.5.3. 取消绑定回调 — remove_done_callback

我们也可以在调用 add_done_callback 后,通过相同参数调用 remove_done_callback 方法来取消回调。

5. 执行休眠 — asyncio.sleep

上面的例子中,我们使用了 asyncio.sleep 方法:

coroutine asyncio. sleep(delay, result=None, *, loop=None)

这个方法与 time.sleep 基本一致,都是挂起当前任务,阻塞 delay 指定的秒数,阻塞中允许其他任务运行。 不同之处在于,如果传递了 result,则会在协程完成时将其返回给调用者。 最后一个参数 loop 已经被废弃,预计将于 python3.10 移除。

6. 并发执行协程 — asyncio.gather

使用协程最重要的当然是并发运行任务,asyncio 包中,gather 方法就是用来并发运行我们的一系列协程对象的。

awaitable asyncio. gather(*aws, loop=None, return_exceptions=False)

6.1. 说明

6.1.1. 参数

  1. aws — 可等待对象集合
  2. loop — 该参数已被废弃
  3. return_exceptions — 是否等待返回时抛出异常,为 False 会立即抛出异常,否则在所有可等待对象运行完成后将异常聚合至结果列表返回

6.1.2. 返回

gather 返回的同样是一个可等待对象,可以通过调用该对象的 cancel 方法取消,所有通过 gather 方法提交但尚未完成的可等待对象也会被取消。

6.2. 示例

代码语言:javascript
复制
import asyncioimport logging

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        logging.info(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    logging.info(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*: 
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

打印出了:

2019-07-11 21:06:43,802 - INFO: Task A: Compute factorial(2)… 2019-07-11 21:06:43,803 - INFO: Task B: Compute factorial(2)… 2019-07-11 21:06:43,803 - INFO: Task C: Compute factorial(2)… 2019-07-11 21:06:44,804 - INFO: Task A: factorial(2) = 2 2019-07-11 21:06:44,804 - INFO: Task B: Compute factorial(3)… 2019-07-11 21:06:44,805 - INFO: Task C: Compute factorial(3)… 2019-07-11 21:06:45,805 - INFO: Task B: factorial(3) = 6 2019-07-11 21:06:45,805 - INFO: Task C: Compute factorial(4)… 2019-07-11 21:06:46,806 - INFO: Task C: factorial(4) = 24

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. asyncio 的构成
    • 2.1. 事件循环 — event_loop
      • 2.2. 协程 — coroutine
        • 2.3. 任务 — task
          • 2.4. future
            • 2.5. async/await 关键字
              • 2.6. 可等待对象
              • 3. 通过事件循环运行协程
                • 3.1. python3.7 的优化
                • 4. 创建任务
                  • 4.1. 任务的状态
                    • 4.2. 创建 Task 对象
                      • 4.2.1. 示例
                    • 4.3. 任务的取消 — cancel
                      • 4.4. 屏蔽取消 — asyncio.shield
                        • 4.5. Task 结果的获取 — result 方法与回调
                          • 4.5.1. result 方法
                          • 4.5.2. 绑定回调 — add_done_callback
                          • 4.5.3. 取消绑定回调 — remove_done_callback
                      • 5. 执行休眠 — asyncio.sleep
                      • 6. 并发执行协程 — asyncio.gather
                        • 6.1. 说明
                          • 6.1.1. 参数
                          • 6.1.2. 返回
                        • 6.2. 示例
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档