实现异步最经典的方法是起一个线程,然后调用回调函数。在python中的yield关键字,可以简单的切换代码的上下文。这为优雅的实现异步提供了可能。
在python中,也能使用协程来进行任务的处理。由于python不能利用多核优势,协程在某种程度上比线程的效率更高。然而,在协程中,任务不能是阻塞的。因为协程的任务不能并行。阻塞一个任务将阻塞住整个流程。
import time
import asyncio
async def money_counter():
time.sleep(5)
return 1000
async def guest(username):
print("guest {} ask for money".format(username))
money = await money_counter()
print("get money {}".format(money))
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(guest("yzh")),
asyncio.ensure_future(guest("zhh"))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print("continue")
input()
# 很遗憾,结果是这样的:
# guest yzh ask for money
# get money 1000
# guest zhh ask for money
# get money 1000
# continue
上面的示例中由于使用的是sleep,并不能节省时间。但是如果换成IO操作(asyncio.open_connection),则可以提升性能。
当然不是 我们要用Loop的函数处理ThreadPoolExecutor对象,让它能把future的result传回await。 在Py的异步中起线程调用阻塞函数通常没有什么意义。 看代码吧:
import time
import asyncio
from concurrent import futures
thread_pool = futures.ThreadPoolExecutor(4)
def money_counter():
time.sleep(2)
return 1000
async def guest(username):
print("guest {} ask for money".format(username))
# money = await money_counter()
money = await loop.run_in_executor(thread_pool, money_counter)
print("get money {}".format(money))
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(guest("yzh")),
asyncio.ensure_future(guest("zhh"))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print("continue")
input()
如何把一个使用callback的函数包装成能使用await语法的乖宝宝呢? 答案是,使用future来进行封装。下面有一个小例子。
import asyncio
import time
from multiprocessing.dummy import Pool as ThreadPool
loop = asyncio.get_event_loop()
tasks = []
def tasks_run():
while True:
if tasks:
time.sleep(1)
temp_task = tasks.pop(0)
loop.call_soon_threadsafe(temp_task) # 在其它线程调用callback需要使用这个函数
time.sleep(1)
p = ThreadPool(2)
p.apply_async(tasks_run)
def asyncfn(clb):
tasks.append(clb)
def wraper():
myfuture = asyncio.Future()
print("clb0 {}".format(myfuture))
def clb():
print("clb2 {}".format(myfuture))
myfuture.set_result("hello")
asyncfn(clb)
return myfuture
async def main1():
data = await wraper()
print("data is {}".format(data))
loop.run_until_complete(main1())
loop.close()
如果让我来实现单线程异步框架,我要怎么做呢? 其实很简单,所谓异步,一定要有调度,要能并行。要并行就一定不能阻塞,要有多线程,或者调用其它的异步接口(比如IO,数据库)。 最简单的方式如下:
loop = Loop()
loop.add(task1)
loop.add(task2)
def task1():
dosth_before()
result = yield dosth_async1()
return result
def task2():
dosth_before()
result = yield dosth_async2()
return result
loop中,循环这两个任务,首先执行task1,当执行到result = yield dosth_async1()
的时候,yield dosth_async1()
执行(此时还没result的事)。这里一定不能阻塞。所以,它立即就返回了dosth_async1()
对象。
重点来了,此时loop不要去管task1了,马上起动task2,执行到yield dosth_async2()
后,又跳出了task2,此时,所有的任务都循环了一遍。该执行第二遍loop了。
在第二次循环中,如果dosth_async1()
已经有结果了。我们点task1(用task1.send(result)),让它继续执行,task1得到了result,该干嘛干嘛。执行完毕后,轮到了task2,故技重施,如果dosth_async2
有结果了,那就点task2。
整个loop执行完毕。