首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python异步生成器而不是异步

Python异步生成器而不是异步
EN

Stack Overflow用户
提问于 2019-07-05 11:26:31
回答 2查看 1.4K关注 0票数 7

我的代码如下。我希望这两个睡眠可以共享相同的时间框架,并花费1+2*3=7秒来运行脚本。但是似乎发生了一些错误,所以仍然需要3*(1+2)秒。

你知道如何修改代码吗?

代码语言:javascript
复制
import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-05 11:50:10

async/await的重点是交叉任务,而不是函数/生成器。例如,当您await asyncio.sleep(1)时,您当前的协同作用会随着睡眠而延迟。类似地,async for将其协同线延迟到下一项准备就绪。

为了运行独立的功能,必须将每个部分创建为一个单独的任务。使用Queue在它们之间交换项--任务只会延迟到它们交换了项为止。

代码语言:javascript
复制
from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())

如果您经常需要此功能,还可以将其放入包装异步迭代的helper对象中。助手封装队列和单独的任务。您可以直接在async for语句中的异步迭代上应用该助手。

代码语言:javascript
复制
from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())
票数 7
EN

Stack Overflow用户

发布于 2019-07-05 16:53:58

作为对队列执行此操作的替代方法,此解决方案将期货链连接在一起,以便未来的结果是当前项和检索下一项的另一个未来(可以说,类似于链接列表):

代码语言:javascript
复制
from asyncio import sleep, get_event_loop, run, create_task

async def aiter(fut, async_generator):
    try:
        async for item in async_generator:
            fut, prev_fut = get_event_loop().create_future(), fut
            prev_fut.set_result((item, fut))
        else:
            fut.set_exception(StopAsyncIteration())
    except Exception as e:
        fut.set_exception(e)


async def concurrent(async_generator):
    fut = get_event_loop().create_future()
    create_task(aiter(fut, async_generator))

    try:
        while True:
            item, fut = await fut
            yield item
    except StopAsyncIteration as e:
        return

额外的好处是,该解决方案将正确地处理g()中发生的异常,方法是将main()方法中的异常重命名为具有对调试有用的追溯功能。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56902202

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档