首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python:异步生成器已经在运行

Python:异步生成器已经在运行
EN

Stack Overflow用户
提问于 2022-05-11 16:05:12
回答 1查看 359关注 0票数 2

与下面的示例一样,我在使用异步生成器时遇到了一个不寻常的错误。

代码语言:javascript
复制
async def demo():
    async def get_data():
        for i in range(5):  # loop: for or while
            await asyncio.sleep(1)  # some IO code

            yield i

    datas = get_data()

    await asyncio.gather(
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
    )


if __name__ == '__main__':
    # asyncio.run(main())
    asyncio.run(demo())

控制台输出:

代码语言:javascript
复制
2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
    asyncio.run(demo())
  File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
    await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running

情境描述:我有一个循环逻辑,它一次从Redis获取一批数据,我想使用产率来返回结果。但是,当我创建一个并发任务时,会发生此错误。

这种情况有什么好的解决办法吗?我并不是想改变我现在使用它的方式,而是看看我是否能够判断它是否正在运行,或者类似于锁之类的东西,然后等待它运行,然后执行一个but。

也许我的逻辑现在是不合理的,但我也想理解一些批评的语言,让我意识到这一点的严重性。

谢谢你的帮助。

EN

回答 1

Stack Overflow用户

发布于 2022-09-20 17:31:20

TL;DR:正确的方法

异步生成器非常适合并行消耗。见下面的解释。作为一种适当的解决办法,请使用asyncio.Queue进行生产者和消费者之间的通信:

代码语言:javascript
复制
queue = asyncio.Queue()

async def producer():
    for item in range(5):
        await asyncio.sleep(random.random())  # imitate async fetching
        print('item fetched:', item)
        await queue.put(item)

async def consumer():
    while True:
        item = await queue.get()
        await asyncio.sleep(random.random())  # imitate async processing
        print('item processed:', item)

await asyncio.gather(producer(), consumer(), consumer())

上面的代码片段可以很好地处理一个无限的项目流:例如,一个web服务器,它将永远运行,为客户端的请求提供服务。但是,如果我们需要处理有限数量的项目呢?如何知道何时停止?**

这是关于堆栈溢出的另一个问题,以涵盖所有可选方案,但最简单的选项是sentinel方法,如下所述。

哨兵:有限数据流法

介绍一个sentinel = object()。当获取来自外部数据源的所有项并将其放入队列时,producer必须将尽可能多的sentinel推送到队列中,就像您拥有的consumer那样。一旦一个consumer获取了sentinel,它就知道它应该停止:if item is sentinel: break循环。

代码语言:javascript
复制
sentinel = object()
consumers_count = 2

async def producer():
    ...  # the same code as above
    if new_item is None:  # if no new data
        for _ in range(consumers_count):
            await queue.put(sentinel)

async def consumer():
    while True:
        ...  # the same code as above
        if item is sentinel:
            break

await asyncio.gather(
    producer(),
    *(consumer() for _ in range(consumers_count)),
)

TL;DR 2:一个肮脏的解决办法

由于您不需要更改异步生成器方法,这里有一个基于异步的替代方案。要解决这个问题(以简单而又脏的方式),您可以用锁包装源异步生成器:

代码语言:javascript
复制
async def with_lock(agen, lock: asyncio.Lock):
    while True:
        async with lock:  # only one consumer is allowed to read
            try:
                yield await anext(agen)
            except StopAsyncIteration:
                break

lock = asyncio.Lock()  # a common lock for all consumers
await asyncio.gather(
    # every consumer must have its own "wrapped" generator
    anext(with_lock(datas, lock)),
    anext(with_lock(datas, lock)),
    ...
)

这将确保每次只有一个使用者在等待生成器中的项。在此使用者等待时,其他使用者正在执行,因此并行化不会丢失。

async for大致相同的代码(看起来更聪明一些):

代码语言:javascript
复制
async def with_lock(agen, lock: asyncio.Lock):
    await lock.acquire()
    async for item in agen:
        lock.release()
        yield item
        await lock.acquire()
    lock.release()

但是,此代码只处理异步生成器的anext方法。而generators还包括acloseathrow方法。见下面的解释。

尽管您也可以将对这些的支持添加到with_lock函数中,但我建议要么将生成器子类化并在内部处理锁支持,要么更好地使用上面的Queue-based方法。

请参阅contextlib.aclosing获得一些灵感。

解释

同步和异步生成器都有一个特殊的属性:.gi_running (用于常规生成器)和.ag_running (用于异步生成器)。您可以通过在生成器上执行dir来发现它们:

代码语言:javascript
复制
>>> dir((i for i in range(0))
[..., 'gi_running', ...]

当执行生成器的True.__anext__方法时,它们被设置为.__anext__ (next(...)anext(...)只是这些方法的语法糖)。

这会防止在生成器上重新执行next(...),而同一生成器上的另一个next(...)调用已经被执行:如果正在运行的标志是True,则会引发异常(对于同步生成器,它会引发ValueError: generator already executing)。

因此,回到您的示例,当您运行await anext(datas) (通过asyncio.gather)时,会发生以下情况:

  1. datas.ag_running设置为True
  2. 执行流进入datas.__anext__方法。
  3. 一旦在__anext__方法(在您的例子中是await asyncio.sleep(1))中到达内部__anext__语句,asyncio的循环就切换到另一个使用者。
  4. 现在,另一个使用者也尝试调用await anext(datas),但是由于datas.ag_running标志仍然设置为True,这将导致RuntimeError

为什么需要这面旗子?

可以暂停并恢复生成器的执行。但仅限于yield语句。因此,如果生成器在内部await语句处暂停,它就不能“恢复”,因为它的状态不允许它。

这就是为什么对生成器的并行next/anext调用会引发异常:它还没有准备好恢复,它已经在运行。

athrowaclose

生成器的API (同步和异步)不仅包括用于迭代的send/asend方法,还包括:

  • close/aclose在退出或异常时释放生成器分配的资源(例如数据库连接)
  • throw/athrow通知生成器它必须处理异常。

acloseathrow也是异步方法。这意味着,如果两个使用者试图并行关闭/抛出基础生成器,您将遇到相同的问题,因为生成器将关闭(或处理异常),而关闭(抛出异常)。

同步生成器示例

虽然对于异步生成器来说,这是一个常见的情况,但是为同步生成器重新生成它并不那么幼稚,因为同步next(...)调用很少被中断。

中断同步生成器的一种方法是使用多个使用者(运行在并行线程中)从单个生成器中读取多线程代码。在这种情况下,当生成器的代码在执行next调用时被中断时,所有其他使用者调用next的并行尝试都会导致异常。

实现这一目标的另一种方法是通过自用生成器在发电机相关PEP #255中演示的:

代码语言:javascript
复制
>>> def g():
...     i = next(me)
...     yield i
... 
>>> me = g()
>>> next(me)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in g
ValueError: generator already executing

调用外部next(me)时,它将me.gi_running设置为True,然后执行生成器函数代码。随后的内部next(me)调用将导致ValueError

结论

当单个读取器使用时,生成器(特别是异步)工作得最好。多用户支持很难,因为需要对生成器的所有方法进行修补,因此不鼓励。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72204244

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档