下面的代码工作正常,但我有点困惑于以下内容:
DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
和
DeprecationWarning: There is no current event loop
loop.run_until_complete(asyncio.gather(*coroutine_list))
警告。
我找到了这和这,但我困惑的是如何更新我的代码。我知道现在还没有一个偶数循环,但是我如何设置它呢?我以为我需要用asyncio.new_event_loop()
来改变asyncio.new_event_loop()
,但这似乎行不通。
这是我的密码
async def flush(self, queue_c) -> None:
try:
loop = asyncio.get_running_loop()
result_objects_action = loop.run_in_executor(None, self.create_objects, queue_c)
# do stuff
except:
# except handling
def _run_queue(self) -> None:
coroutine_list = []
c = 0
while c < len(self._queue):
coroutine_list.append(self.flush(c))
c+=1
# What should be changed here?
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*coroutine_list))
self._queue = []
发布于 2022-09-28 16:17:34
调用asyncio.new_event_loop()是不够的,您需要在使用asyncio.set_event_loop(循环)之后设置它。我相信python3.10引入了get_event_loop()的这个弃用警告。下面是如何设置事件循环,以便它可以与任何python 3.x版本一起工作:
import sys
import asyncio
if sys.version_info < (3, 10):
loop = asyncio.get_event_loop()
else:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
https://stackoverflow.com/questions/73884117
复制相似问题