我试着用运行下面的代码
import asyncio
async def test():
for _ in range(3):
print("Test")
await asyncio.sleep(1)
loop = asyncio.get_running_loop() # Here
loop.run_until_complete(test())
但是,我发现了下面的错误:
RuntimeError:不运行事件循环
我可以用下面所示的代替来运行上面的代码,但是自从3.10版本的之后,就不再受欢迎了,所以我不想使用它。
# ...
loop = asyncio.get_event_loop() # Here
# loop = asyncio.get_running_loop()
# ...
因此,结果如下:
Test
Test
Test
那么,如何使用运行上面的代码?
发布于 2022-10-31 01:26:05
您得到了错误,因为asyncio.get_running_loop()
试图获得一个正在运行的事件循环,但是没有正在运行的事件循环:
返回当前OS线程中正在运行的事件循环。
如果没有正在运行的事件循环,将引发RuntimeError。此函数只能从coroutine或回调调用。
因此,您需要使用asyncio.get_running_loop()
创建和设置一个和事件循环,如下所示:
import asyncio
async def test():
for _ in range(3):
print("Test")
await asyncio.sleep(1)
loop = asyncio.new_event_loop() # Here
asyncio.set_event_loop(loop) # Here
# loop = asyncio.get_running_loop()
loop.run_until_complete(test())
然后,您的代码正常工作:
Test
Test
Test
此外,您还可以像下面这样使用asyncio.get_running_loop()
。在本例中,有一个正在运行的事件循环,asyncio.get_running_loop()
获取它,因此不会发生错误:
import asyncio
async def test():
for _ in range(3):
print("Test")
await asyncio.sleep(1)
async def call_test():
loop = asyncio.get_running_loop() # Here
await loop.create_task(test())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(call_test())
然后,这段代码也能正常工作:
Test
Test
Test
https://stackoverflow.com/questions/74257941
复制相似问题