我试图找到一种方法来限制使用Python的异步和httpx模块向服务器发出的并发HTTP请求的数量。我偶然发现了这个. StackOverflow
它建议使用asyncio.Semaphore
来阻止多个消费者提出太多的请求。虽然这个答案工作得很好,但它使用的是显式循环构造,而不是asyncio.run
。当我用asyncio.run
替换显式循环构造时,代码的行为会发生变化。现在,它不再执行所有9个请求,而是执行三个请求,然后停止。
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks, return_exceptions=True) # await moment all downloads done
if __name__ == '__main__':
asyncio.run(main())
这张打印出来:
downloading 0 will take 3 second(s)
downloading 1 will take 1 second(s)
downloading 2 will take 3 second(s)
downloaded 1
downloaded 0
downloaded 2
我不得不将await asyncio.gather(*tasks)
更改为await asyncio.gather(*tasks, return_exceptions=True)
,这样代码就不会抛出RuntimeError
。否则它会抛出这个错误,我已经打开了异步调试模式。
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
Traceback (most recent call last):
File "/home/rednafi/workspace/personal/demo/demo.py", line 66, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/rednafi/workspace/personal/demo/demo.py", line 62, in main
await asyncio.gather(*tasks) # await moment all downloads done
File "/home/rednafi/workspace/personal/demo/demo.py", line 52, in safe_download
async with sem: # semaphore limits num of simultaneous downloads
File "/usr/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
await self.acquire()
File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='Task-5' coro=<safe_download() running at /home/rednafi/workspace/personal/demo/demo.py:52> cb=[gather.<locals>._done_callback() at /usr/lib/python3.9/asyncio/tasks.py:764] created at /home/rednafi/workspace/personal/demo/demo.py:58> got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
但是,唯一的其他更改是用asyncio.run
替换显式循环。
问题是为什么代码的行为会改变?我怎样才能找回那些老生常谈的行为呢?
发布于 2021-04-18 19:44:55
问题是在顶层创建的Semaphore
缓存事件循环在创建过程中是活动的(异步自动创建的事件循环,启动时由get_event_loop()
返回)。另一方面,asyncio.run()
在每次运行时创建一个新的事件循环。因此,您正在尝试等待来自不同事件循环的信号量,但失败。和往常一样,隐藏异常而不了解其原因只会导致进一步的问题。
要正确解决这个问题,您应该在asyncio.run()
内部创建信号量。例如,最简单的修补程序可以如下所示:
# ...
sem = None
async def main():
global sem
sem = asyncio.Semaphore(3)
# ...
更优雅的方法是从顶层完全删除sem
,并显式地将其传递给safe_download
。
async def safe_download(i, limit):
async with limit:
return await download(i)
async def main():
# limit parallel downloads to 3 at most
limit = asyncio.Semaphore(3)
# you don't need to explicitly call create_task() if you call
# `gather()` because `gather()` will do it for you
await asyncio.gather(*[safe_download(i, limit) for i in range(9)])
https://stackoverflow.com/questions/67152371
复制相似问题