深入python协程的实现,带你一层一层揭开协程的神秘面纱!

设计思路

EventLoop和torando的IOLoop一样,使用IO多路复用来阻塞循环,使用一个Waker来唤醒EventLoop,这点我是照抄的torando,不同之处在于EventLoop中没有回调函数列表,而是拥有一个协程字典。

协程字典是由一堆协程对象作为key组成的。协程类继承于Torando::Future,本质上协程对象也是一个Future对象,不过更进一步的是,他同时还是一个生成器代理。key对应的value为每次yield返回的值。

协程类是通过一个装饰器创建的,代码如下:

def coroutine(func): def __init__(self, *args, **kwargs): super(self.__class__, self).__init__() self.gen = func(*args, **kwargs) def send(self, result): try: return self.gen.send(result) except (GeneratorExit, StopIteration) as e: self.set_result(e.args[0]) raise def throw(self, typ, val=None, tb=None): return self.gen.throw(typ, val=None, tb=None) def __iter__(self): return self.gen def __await__(self): yield from self cls = type(func.__name__, (Future, Coroutine, Iterable), {"__init__": __init__, "send": send, "throw": throw, "__iter__": __iter__, "__await__": __await__}) return cls

和当IOLoop一样,EventLoop开始事件循环后,每当有协程对象加入到事件循环中,都会被唤醒,执行循环,循环会处理监听到的文件描述符,调用其handler,处理timout对象。接下来处理的协程字典中的协程是IOLoop所没有的操作。

处理协程的方法如下:

def process_coroutine(self, coroutine, yielded): while True: try: # 如果当前yield值为Future对象,则判断Future是否完成,进而调用send方法发结果,否则中断本协程的进行。 # 如果当前yield值不是Future对象, 直接send当前值。 # 如果期间发生协程运行完毕,则删除该协程。 if isinstance(yielded, Future): if yielded.done(): yielded = self.coroutines[coroutine] = \ coroutine.send(yielded.result()) else: break else: yielded = self.coroutines[coroutine] = coroutine.send(yielded) except (GeneratorExit, StopIteration): del self.coroutines[coroutine] break

为什么要这么实现呢?

假设我们现在拥有一个简单的协程

demo0

# 通过coroutine装饰器,该协程会被变成一个协程子类。 @coroutine def normal(): a = yield 1 print("Get a: %s" % a) b = yield 2 print("Get b: %s" % b) c = yield 3 print("Get c: %s" % c) return c coroutine0 = normal() # 创建一个协程对象 loop = EventLoop() # 创建事件循环 loop.run_until_complete(coroutine0)# 开启事件循环直到完成 print(coroutine0.result()) # 打印输出

以上代码大致会经过如下的操作:

创建一个协程对象

创建事件循环

将协程对象添加到协程字典中

开启事件循环

事件循环运行到process_coroutine,此时yielded在process_coroutine中由倒数第4行代码依次变成1, 2, 3(因为协程中的yield返回的不是future类型),并通过send方法赋值给了a, b, c。直到最后抛出StopIteration。并删除了该协程

运行结束result方法可以调用了返回return值

通过这个简单的协程我们大致了解了整个程序的运行流程。但这种协程并没有什么卵用,而且把一件简单的事情弄的非常复杂。接下来再更进一步介绍一个相对复杂的协程。

我们先看一个阻塞函数:

def sleep(seconds): future = Future() def callback(): future.set_result(None) EventLoop().call_at(time.time() + seconds, callback) yield future

这个函数和tornado的sleep函数差不多,函数创建了一个future对象,并在EventLoop中添加了一个Timeout对象,Timeout对象会在指定时间后后被事件循环调用其callback。在这里,被callback之后,future被set_result,也就是说Future.done()变为了True(记住这一点,后面会用到,很重要)。

下面再写一个复杂一点的的协程

demo1

@coroutine def sum(a, b): print("Sum start. %s + %s" % (a, b)) yield from sleep(1) result = yield a + b print("Sum stop. %s + %s" % (a, b)) return result coroutine1 = sum(1, 2) # 创建一个协程对象 loop = EventLoop() # 创建事件循环 loop.run_until_complete(coroutine1)# 开启事件循环直到完成 print(coroutine1.result()) # 打印输出

以上代码大致会经过如下的操作:

创建一个协程对象

创建事件循环

将协程对象添加到协程字典中

开启事件循环

事件循环运行到process_coroutine,sleep返回一个Future对象,并且调用其done方法发现没有完成。于是跳出循环,回到事件循环。

事件循环会获取timeouts中所有Timeout对象的最小延迟时长并做为超时时长挂起。这时候整个世界都停止了。

等超时之后,处理timeouts中Timeout对象,发现sleep结束了,调用其callback,future对象这时被置为完成。

事件循环继续前进到达process_coroutine,此时调用future的done方法发现完成了。send future.result(),继而执行a + b。随后的情况和第一个例子无异了。

在这个例子中我们定义了一个sleep子协程,并学习到了如何实现阻塞。

再来看第三个更复杂的例子

demo2

@coroutine def sum(a, b): print("Sum start. %s + %s" % (a, b)) yield from sleep(1) result = yield a + b print("Sum stop. %s + %s" % (a, b)) return result @coroutine def multi(a, b): print("Multi start. %s x %s" % (a, b)) yield from sleep(2) result = yield a * b print("Multi stop. %s x %s" % (a, b)) return result @coroutine def aaddbthenmutilc(a, b, c): sum_result = yield from sum(a, b) multi_result = yield from multi(sum_result, c) return multi_result coroutine2 = aaddbthenmutilc() # 创建一个协程对象 loop = EventLoop() # 创建事件循环 loop.run_until_complete(coroutine2)# 开启事件循环直到完成 print(coroutine2.result()) # 打印输出

刚才说了协程也是一个future,但协程本身还是一个生成器。所以当我们返回协程时,使用yield from 代替yield。

当使用yield from 时生成器和子生器之间就建立起了一个长长的管道,子生器生成的值可以直接返回,无论链接多少个生成器,就好像直接调用一个生成器一样。。

在这里coroutine2中调用了sum子协程。一但其结束,sum的值会被赋值给sum_result,coroutine2继续执行multi,直到结束。

写到现在,可能有同学会问了,如果是真正的IO阻塞,怎么处理。比如socket。别急,下面马上给出第4个例子。

在此之前,我又实现了一个类似于sleep的阻塞函数

def get_buffer(socket): future = Future() def callback(fd_obj, events): future.set_result(fd_obj.recv(1024)) EventLoop().add_handler(socket, callback, EventLoop.READ) buffer = yield future return buffer

这个函数是专门用来处理socket读产生的阻塞的,创建一个future,并创建一个回调函数,添加到事件循环中,EventLoop中有一组对象被称为handlers,是用来处理文件描述符的。这个和tornado的IOLoop保持一致。通过add_handler可以添加。

在这里,socket被监听了READ事件。

demo3

@coroutine def socket_coroutine(): print("socket_coroutine start. ") import socket from toolkit.event_loop import get_buffer client = socket.socket() client.connect(("", 1234)) buffer = yield from get_buffer(client) client.close() print("socket_coroutine stop. ") return buffer coroutine3 = socket_coroutine() # 创建一个协程对象 loop = EventLoop() # 创建事件循环 loop.run_until_complete(coroutine3)# 开启事件循环直到完成 print(coroutine3.result()) # 打印输出

同时我开启一个server

In [3]: def serve(): ...: server = socket.socket() ...: server.bind(("", 1234)) ...: server.listen(0) ...: client, addr = server.accept() ...: import time ...: time.sleep(5) ...: client.send(b"hello world") ...: client.close() ...: server.close() ...: In [4]: serve()

这个例子和sleep差不多,不同之处在于sleep让事件循环停止了一段时间,而通过IO多路复用则是在被监听的文件描述符发生读事件时激活了事件循环。之后的步骤和demo1一样。

下面还有最后一个例子,就是多个协程一起运行。

demo4

不贴代码了,太长,代码在此!https://github.com/ShichaoMa/toolkit/blob/master/test/test_event_loop.py

里面有6个协程。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181028A1DZW900?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券