与下面的示例一样,我在使用异步生成器时遇到了一个不寻常的错误。
async def demo():
async def get_data():
for i in range(5): # loop: for or while
await asyncio.sleep(1) # some IO code
yield i
datas = get_data()
await asyncio.gather(
anext(datas),
anext(datas),
anext(datas),
anext(datas),
anext(datas),
)
if __name__ == '__main__':
# asyncio.run(main())
asyncio.run(demo())控制台输出:
2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
asyncio.run(demo())
File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
return future.result()
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running情境描述:我有一个循环逻辑,它一次从Redis获取一批数据,我想使用产率来返回结果。但是,当我创建一个并发任务时,会发生此错误。
这种情况有什么好的解决办法吗?我并不是想改变我现在使用它的方式,而是看看我是否能够判断它是否正在运行,或者类似于锁之类的东西,然后等待它运行,然后执行一个but。
也许我的逻辑现在是不合理的,但我也想理解一些批评的语言,让我意识到这一点的严重性。
谢谢你的帮助。
发布于 2022-09-20 17:31:20
TL;DR:正确的方法
异步生成器非常适合并行消耗。见下面的解释。作为一种适当的解决办法,请使用asyncio.Queue进行生产者和消费者之间的通信:
queue = asyncio.Queue()
async def producer():
for item in range(5):
await asyncio.sleep(random.random()) # imitate async fetching
print('item fetched:', item)
await queue.put(item)
async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(random.random()) # imitate async processing
print('item processed:', item)
await asyncio.gather(producer(), consumer(), consumer())上面的代码片段可以很好地处理一个无限的项目流:例如,一个web服务器,它将永远运行,为客户端的请求提供服务。但是,如果我们需要处理有限数量的项目呢?,如何知道何时停止?**
这是关于堆栈溢出的另一个问题,以涵盖所有可选方案,但最简单的选项是sentinel方法,如下所述。
哨兵:有限数据流法
介绍一个sentinel = object()。当获取来自外部数据源的所有项并将其放入队列时,producer必须将尽可能多的sentinel推送到队列中,就像您拥有的consumer那样。一旦一个consumer获取了sentinel,它就知道它应该停止:if item is sentinel: break循环。
sentinel = object()
consumers_count = 2
async def producer():
... # the same code as above
if new_item is None: # if no new data
for _ in range(consumers_count):
await queue.put(sentinel)
async def consumer():
while True:
... # the same code as above
if item is sentinel:
break
await asyncio.gather(
producer(),
*(consumer() for _ in range(consumers_count)),
)TL;DR 2:一个肮脏的解决办法
由于您不需要更改异步生成器方法,这里有一个基于异步的替代方案。要解决这个问题(以简单而又脏的方式),您可以用锁包装源异步生成器:
async def with_lock(agen, lock: asyncio.Lock):
while True:
async with lock: # only one consumer is allowed to read
try:
yield await anext(agen)
except StopAsyncIteration:
break
lock = asyncio.Lock() # a common lock for all consumers
await asyncio.gather(
# every consumer must have its own "wrapped" generator
anext(with_lock(datas, lock)),
anext(with_lock(datas, lock)),
...
)这将确保每次只有一个使用者在等待生成器中的项。在此使用者等待时,其他使用者正在执行,因此并行化不会丢失。
与async for大致相同的代码(看起来更聪明一些):
async def with_lock(agen, lock: asyncio.Lock):
await lock.acquire()
async for item in agen:
lock.release()
yield item
await lock.acquire()
lock.release()但是,此代码只处理异步生成器的anext方法。而generators还包括aclose和athrow方法。见下面的解释。
尽管您也可以将对这些的支持添加到with_lock函数中,但我建议要么将生成器子类化并在内部处理锁支持,要么更好地使用上面的Queue-based方法。
请参阅contextlib.aclosing获得一些灵感。
解释
同步和异步生成器都有一个特殊的属性:.gi_running (用于常规生成器)和.ag_running (用于异步生成器)。您可以通过在生成器上执行dir来发现它们:
>>> dir((i for i in range(0))
[..., 'gi_running', ...]当执行生成器的True或.__anext__方法时,它们被设置为.__anext__ (next(...)和anext(...)只是这些方法的语法糖)。
这会防止在生成器上重新执行next(...),而同一生成器上的另一个next(...)调用已经被执行:如果正在运行的标志是True,则会引发异常(对于同步生成器,它会引发ValueError: generator already executing)。
因此,回到您的示例,当您运行await anext(datas) (通过asyncio.gather)时,会发生以下情况:
datas.ag_running设置为True。datas.__anext__方法。__anext__方法(在您的例子中是await asyncio.sleep(1))中到达内部__anext__语句,asyncio的循环就切换到另一个使用者。await anext(datas),但是由于datas.ag_running标志仍然设置为True,这将导致RuntimeError。为什么需要这面旗子?
可以暂停并恢复生成器的执行。但仅限于yield语句。因此,如果生成器在内部await语句处暂停,它就不能“恢复”,因为它的状态不允许它。
这就是为什么对生成器的并行next/anext调用会引发异常:它还没有准备好恢复,它已经在运行。
athrow和aclose
生成器的API (同步和异步)不仅包括用于迭代的send/asend方法,还包括:
close/aclose在退出或异常时释放生成器分配的资源(例如数据库连接)throw/athrow通知生成器它必须处理异常。aclose和athrow也是异步方法。这意味着,如果两个使用者试图并行关闭/抛出基础生成器,您将遇到相同的问题,因为生成器将关闭(或处理异常),而关闭(抛出异常)。
同步生成器示例
虽然对于异步生成器来说,这是一个常见的情况,但是为同步生成器重新生成它并不那么幼稚,因为同步next(...)调用很少被中断。
中断同步生成器的一种方法是使用多个使用者(运行在并行线程中)从单个生成器中读取多线程代码。在这种情况下,当生成器的代码在执行next调用时被中断时,所有其他使用者调用next的并行尝试都会导致异常。
实现这一目标的另一种方法是通过自用生成器在发电机相关PEP #255中演示的:
>>> def g():
... i = next(me)
... yield i
...
>>> me = g()
>>> next(me)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in g
ValueError: generator already executing调用外部next(me)时,它将me.gi_running设置为True,然后执行生成器函数代码。随后的内部next(me)调用将导致ValueError。
结论
当单个读取器使用时,生成器(特别是异步)工作得最好。多用户支持很难,因为需要对生成器的所有方法进行修补,因此不鼓励。
https://stackoverflow.com/questions/72204244
复制相似问题