前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >以定时器为例研究一手 Python asyncio 的协程事件循环调度

以定时器为例研究一手 Python asyncio 的协程事件循环调度

作者头像
菜皮日记
发布2024-08-11 09:27:17
1150
发布2024-08-11 09:27:17
举报
文章被收录于专栏:菜皮日记

在使用 Python 的 asyncio 库实现异步编程的过程中,协程与事件循环这两个概念可以说有着千丝万缕的联系,常常是形影不离的出现,如胶似漆般的存在,asyncio 库到底是如何调度协程的? 下面以 Python 3.8 中的 asyncio.sleep 定时器为例研究一手 asyncio 的源码实现。

几个主要的概念

首先需要对 asyncio 中的几个主要函数和模块做一个初步认识:

  • asyncio.run 是启动事件循环的入口,接收一个协程作为参数。
  • asyncio.BaseEventLoop 就是事件循环基类了,子类常用的是 _UnixSelectorEventLoop,但核心调度逻辑都在基类中,其中最主要的是 run_forever 函数用来启动事件循环;另一个主要的函数是 create_task ,用来创建一个 Task 对象并放到事件循环中,准备在下一次循环时执行。
  • asyncio.events.Handleasyncio.events.TimerHandle 是放到 loop 中的处理对象,其中 _callback 属性保存的是一个回调函数,处理对象执行时调用的就是这个函数,回调函数参数放在_args 属性中。
  • asyncio.futures.Future 作为一个事件在未来完成的占位符,当事件完成后可通过 Future.set_result 方法将事件的结果设置进去。
  • asyncio.tasks.TaskFuture 类的子类,可以理解为是对协程的包装,在 Future 基础上增加了启动协程和恢复协程的能力,主要逻辑在 Task.__step 函数中。

从简单例子开始

先从最简单的一段代码开始

这段代码启动一个 main 协程,协程输出两行内容后完成结束,这里先不加入任何 await 异步操作,主要看一下事件循环是怎样初始化和启动的,只保留了关键代码。

loop 的初始化

首先看 asyncio.run 函数,内容比较简单,初始化一个事件循环 loop,然后调用 loop.run_until_complete(main) 启动并传入 main 协程。

Task 的初始化

接着来到 asyncio.base_events.BaseEventLoop.run_until_complete,首先调用了 asyncio.tasks.ensure_future 函数,目的是将传入的 main 协程转换成一个 Task 对象,在创建 Task 的过程中会将 Task 对象加入到 loop 的队列中,之后调用 self.run_forever 启动事件循环。

确切的说应该是将 Task.__step 函数包装到 Handle 对象中,之后加入到 loop 的队列中,稍后会看到这个细节。

再看一下 Task.__init__,其中 _coro 保存了传入的协程 coro 对象,实际上可以将 Task 视为一个协程的包装,在初始化的后面调用了 loop.call_soon(self.__step, context=self._context) 函数,将 Task 对象自己的 __step 函数加入到 loop 队列,当 loop 启动后便会执行这个函数。

再看一下 loop.call_soon 做了什么,接受一个 callback 参数,在这里就代表 Task.__step,接着会调用 _call_soon 函数,在 _call_soon 函数中初始化了 events.Handle 对象,然后将 handle 对象加入到 loop._ready 队列中。

在看一眼 Handle 的初始化,主要就是将 callback 保存下来,并且用 args 表示 callback 的参数。

Handle 的一个主要的函数是 _run,当 loop 启动后会从 loop._ready 队列中取出 Handle 执行,执行的就是 _run 函数,_run 函数中 self._context.run(self._callback, *self._args) 其实就是在原有 context 环境下执行回调函数并传入 args 参数。

到这里先总结一下,通过 asyncio.run(main()) 添加了一个协程,然后将协程 main 包装成 Task,并将 Task.__step 包装成 Handle 放到 loop._ready 队列中,接下来就是真正启动 loop 了。

loop 的启动

asyncio.base_events.BaseEventLoop.run_until_complete,在封装完 main 协程后会先添加一个回调函数 _run_until_complete_cb,回调函数会在 main 协程执行完后执行,内容就是将 loop 设置成关闭。

接着的 run_forever 函数就是启动 loop 了。

run_forever 中做了一些初始检查和设置,然后进入 while 循环并在循环中调用 _run_once_run_once 就是一次事件循环的核心调度逻辑了。

loop 调度的核心逻辑

核心调度逻辑在 _run_once 中。loop 主要有两个队列存放协程任务对应的 Handle,一个是 _scheduled 用来存放定时类协程,它是一个最小堆实现的优先队列,例如使用 asyncio.sleep 就会存进去一个 TimerHandle 对象;另一个是 _ready 用来存放准备好执行的协程,而 _scheduled 中有准备好的协程会取出来放入 _ready 中,loop 最终执行 Handle 都是从 _ready 中取出的。

_run_once 中做的事情分四个部分,第一部分是清理 _scheduled;第二部分是调用多路复用 IO 并处理就绪的描述符;第三部分是将到期的 _scheduled 转移到 _ready;第四部分遍历 _ready 并逐一启动处理函数 handle._run;

Handle._run 没啥说的,直接调用 Handle._callback,并且将 Handle._args 作为参数传进去。

还记得 loop 是怎么启动的吗?将 main 协程包装成 Task,在创建 Task 时将 Task.__step 作为 callback 生成了一个 Handle 并放到了 loop._ready 中,所以这里 Handle._run 其实执行的就是根据 main 协程生成出来的 Task.__stepTask.__step 是协程启动和协程暂停恢复的关键

协程的启动

Task._coro 属性保存了协程,通过 result = coro.send(None) 启动协程,由此进入到 main 协程中,打印出 main startmain end

之后 main 协程结束,抛出 StopIteration 异常,调用 super().set_result(exc.value)Task._result 设置结果并将 _state 标记成 _FINISHED,之后调用 __schedule_callbacks 触发 Task 上注册的回调函数,在这里 mian 协程注册的就是 _run_until_complete_cb 用来结束 loop 的,将回调函数放在传给 loop.call_soon 等待下一轮事件循环来触发。

到这里就可能看到一个协程是如何传给 loop 并启动的了,也知道了 loop 的大概流程。下面在 main 中加入 asyncio.sleep 看看定时器是如何调度的。

asyncio.sleep 如何定时

main 中加入一个 asyncio.sleep 看看定时是如何实现的

loop 的初始化和启动还是一样的,直接看看 Task.__step 是如何调度的,其中调用 result = coro.send(None) 会启动协程,首先输出 main start,然后调用 asyncio.sleep(3)

协程的挂起

首先常见一个空的 Future 对象 future,然后调用的 loop.call_later(delay, futures._set_result_unless_cancelled, future, result),然后一路向下调用 loop.call_at,最后生成了一个 TimerHandle 对象 push 进 loop._scheduled 堆中。

TimerHandle 其实就比 Handle 多了个 _when 属性表示何时可以恢复运行,当时间到了会调用 TimerHandle._run 执行 TimerHandlecallback,也就是 _set_result_unless_cancelled(future, result) 用来给 future 设置结果。

asyncio.sleep 的函数签名是 asyncio.sleep(delay, result=None),一般不传第二个参数所以结果是 None,如果传的话之后会将结果设置到 future 对象里面。

asyncio.sleep 函数的最后将 future 返回并挂起自己,控制权又交还给 Task.__stepresult = coro.send(None) 的位置,result 接到的就是 future 对象。

result 接到 future 后向下执行到 result.add_done_callback(self.__wakeup, context=self._context)future 设置一个回调函数 Task.__wakeup,到这里本轮循环就结束了。

到目前为止 loop 的状态是 _scheduled 堆中有一个 TimerHandle 对象,对象的 _when 表示剩余启动的秒数,对象的 _callback 指向的是 futures._set_result_unless_cancelled 参数是一个 future,这个 futurecallbacks 回调列表中有一个 main 协程生成的 Task.__wakeup

协程的恢复

本轮循环结束,下一轮循环时会检查 loop._scheduled 发现 TimerHandle 已经到期,将其放到 loop._ready 队列中,紧接着就取出执行 TimerHandle._run,也就是执行 futures._set_result_unless_cancelled(future, None),其实就是给 future 设置结果、标记完成、执行 future 的回调函数。

还记得 future 是怎么来的以及 future 里面是啥吗?future 是在 asyncio.sleep 时生成并通过 await 返回的,返回给 Task.__step 后通过 add_done_callback(self.__wakeup) 为其添加了一个回调函数。

所以到此为止干的事儿就是遍历 futurecallbacks 逐一通过 loop.call_soon() 添加到 loop 中,等待下一轮事件循环执行,这里添加的就是 main Task__wakeup 函数。

进入下一轮循环,loop._ready 中有一个 Handle,其内部的 _coro 代表的是 main Task__wakeup,取出来执行 Handle._run 实际上就是执行 main Task.__wakeup

__wakeup 也很简单就是确认 future 是已完成状态并调用 __step,控制权有交给了之前挂起的 main Task

Task.__step 再一次执行到 result = coro.send(None) 时,便会恢复之前的 sleep 协程接着执行 return,回到了 main 函数中,继续执行并输出 main end最后完成,抛出 StopIteration 异常,被 Task.__step 捕获,整个协程结束,之后事件循环做收尾工作也关闭,事件循环也关闭,到这里整个程序就结束了。

总结

asyncio 中的定时通过 asyncio.sleep 实现,原理是在事件循环中维护一个最小堆实现的优先队列 _scheduled,其中保存的都是定时任务处理对象 Handle,越早到期 Handle 就会越早被取出来并加入到 loop._ready 队列,在下一轮循环时取出并从挂起的位置恢复执行。

由于协程代码在执行时会切换控制权导致代码逻辑跳来跳去,有时会被绕晕,借助定时器的调度可以让整个事件循环的逻辑更加清晰。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 几个主要的概念
  • 从简单例子开始
    • loop 的初始化
      • Task 的初始化
        • loop 的启动
          • loop 调度的核心逻辑
            • 协程的启动
            • asyncio.sleep 如何定时
              • 协程的挂起
                • 协程的恢复
                • 总结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档