此前的文章中,我们详细介绍了 python 中的协程。 python 的协程
协程是在用户进程中,按照用户预先设定的执行流程进行上下文切换,从而在开销远小于多线程/多进程并发的条件下实现程序的并发执行。 asyncio,tornado 和 gevent 在 python 原有协程机制的基础上封装了更为易用的高层次 api,本文我们就来详细介绍 asyncio 包基于协程实现的异步 IO。
asyncio 实现了一个协程库,他具有下面几个组件:
协程是在用户进程中进行上下文切换实现的,与多线程/多进程并发执行的本质区别是没有操作系统来执行调度。 在 asyncio 中,事件循环就充当了操作系统的角色,负责调度在事件循环上注册的协程函数。
协程对象是通过 async 关键字定义的函数,他需要被注册到事件循环上,在事件循环执行过程中进行调用。
一个协程对象就是一个原生可以挂起的函数。 任务时对协程的进一步封装,其中记录了任务的状态等信息。
我们此前已经介绍过 future 对象,他是一个用来表示将来会被执行的任务的对象。 正如我们之前提到,python 标准库中,在两个包中封装了 Future 类:
在两个包中封装的 Future 类本质上和用法上都是非常接近的。
async 关键字用于定义一个协程方法。 await 关键字则用于挂起阻塞的异步调用接口。 他们都是 python3.5 引入的关键字。
可以被加入事件循环的对象就是可等待对象,分为三种类型:
import timeimport asyncio
async def do_some_work(x):
print('Waiting: ', x)if __name__ == '__main__':
now = lambda: time.time()
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('TIME: ', now() - start)
上面的例子中,我们首先通过 asyncio 的 get_event_loop 方法创建了事件循环,然后将被 async 关键字标记的协程方法注册到事件循环中,事件循环负责调起该方法。 方法顺利执行,打印出了:
Waiting: 2 TIME: 0.002991914749145508
创建事件循环看上去非常繁琐,python3.7 引入了 asyncio.run 方法,让你可以省去这个操作:
import timeimport asyncio
async def do_some_work(x):
print('Waiting: ', x)if __name__ == '__main__':
now = lambda: time.time()
start = now()
asyncio.run(do_some_work(2))
print('TIME: ', now() - start)
由于该方法目前属于“暂定基准状态”,所以本文仍然使用上述事件循环方式启动和运行协程。
asyncio 中 Task 对象是 Future 对象的子类。 上面的例子中,事件循环的 run_until_complete 方法实际上是将我们的协程方法封装成了 Task 对象并运行。 我们也可以显式手动创建 Task 对象,这样最大的好处在于我们可以对协程方法进行更为灵活的控制,例如监控任务执行的状态等。
Task 和 Future 对象一样,拥有四种执行状态:
我们有三种方法创建一个 task 对象,下文中,变量 coroutine 表示我们的协程方法对象:
import asyncioimport loggingimport time
async def say_after(delay, what):
await asyncio.sleep(delay)
logging.info(what) return whatif __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
print(f"started at {time.strftime('%X')}")
loop = asyncio.get_event_loop()
task = loop.create_task(say_after(2, 'world'))
logging.info(task)
loop.run_until_complete(task)
logging.info(task)
print(f"finished at {time.strftime('%X')}")
打印出了:
started at 20:55:11 2019-07-11 20:55:11,769 - INFO: <Task pending coro=<say_after() running at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6>> 2019-07-11 20:55:13,771 - INFO: world 2019-07-11 20:55:13,771 - INFO: <Task finished coro=<say_after() done, defined at C:/Debin/Workspace/code/python/fluentpython/testaisyncio/task_demo.py:6> result=’world’> finished at 20:55:13
cancel()
Task 对象具有 cancel 方法,允许我们取消一个已经提交到事件循环,但尚未完成的任务。 该 Task 对象的协程函数会抛出 CancelledError 异常。 如果在协程中捕获 CancelledError 异常,取消将会被抑制,但这是不推荐的做法。
更为推荐的方法是 asyncio.shield 方法:
asyncio.shield(arg, *, loop=None)
arg 是一个协程方法。 这个方法用于设置该对象屏蔽 cancel 方法:
res = await something() # 未屏蔽 cancelres = await shield(something()) # 屏蔽 cancel
Task 作为 Future 的子类,也同样具有 Future 的 result 方法,实现阻塞等待并获取返回。
import asyncioimport loggingimport time
async def say_after(delay, what):
await asyncio.sleep(delay)
logging.info(what) return whatif __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
print(f"started at {time.strftime('%X')}")
loop = asyncio.get_event_loop()
task = loop.create_task(say_after(2, 'world'))
loop.run_until_complete(task)
print(f"finished by {task.result()} at {time.strftime('%X')}")
打印出了:
started at 21:56:12 2019-07-11 21:56:14,720 - INFO: world finished by world at 21:56:14
通过 add_done_callback 方法,我们可以将一个回调方法绑定到 Task 对象上,一旦任务完成运行,会自动以一个 Future 对象为参数调用预设的 callback 方法。
import asyncioimport loggingimport time
async def say_after(delay, what):
await asyncio.sleep(delay)
logging.info(what) return whatdef callback(future):
logging.info(f"task callback for {future.result()}")if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
print(f"started at {time.strftime('%X')}")
loop = asyncio.get_event_loop()
task = loop.create_task(say_after(2, 'world'))
task.add_done_callback(callback)
loop.run_until_complete(task)
print(f"finished at {time.strftime('%X')}")
打印出了:
started at 21:58:42 2019-07-11 21:58:44,955 - INFO: world finished at 21:58:44 2019-07-11 21:58:44,955 - INFO: task callback for world
我们也可以在调用 add_done_callback 后,通过相同参数调用 remove_done_callback 方法来取消回调。
上面的例子中,我们使用了 asyncio.sleep 方法:
coroutine asyncio. sleep(delay, result=None, *, loop=None)
这个方法与 time.sleep 基本一致,都是挂起当前任务,阻塞 delay 指定的秒数,阻塞中允许其他任务运行。 不同之处在于,如果传递了 result,则会在协程完成时将其返回给调用者。 最后一个参数 loop 已经被废弃,预计将于 python3.10 移除。
使用协程最重要的当然是并发运行任务,asyncio 包中,gather 方法就是用来并发运行我们的一系列协程对象的。
awaitable asyncio. gather(*aws, loop=None, return_exceptions=False)
gather 返回的同样是一个可等待对象,可以通过调用该对象的 cancel 方法取消,所有通过 gather 方法提交但尚未完成的可等待对象也会被取消。
import asyncioimport logging
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
logging.info(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
logging.info(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
打印出了:
2019-07-11 21:06:43,802 - INFO: Task A: Compute factorial(2)… 2019-07-11 21:06:43,803 - INFO: Task B: Compute factorial(2)… 2019-07-11 21:06:43,803 - INFO: Task C: Compute factorial(2)… 2019-07-11 21:06:44,804 - INFO: Task A: factorial(2) = 2 2019-07-11 21:06:44,804 - INFO: Task B: Compute factorial(3)… 2019-07-11 21:06:44,805 - INFO: Task C: Compute factorial(3)… 2019-07-11 21:06:45,805 - INFO: Task B: factorial(3) = 6 2019-07-11 21:06:45,805 - INFO: Task C: Compute factorial(4)… 2019-07-11 21:06:46,806 - INFO: Task C: factorial(4) = 24