首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Python的asyncio.Semaphore控制HTTP请求的并发性

使用Python的asyncio.Semaphore控制HTTP请求的并发性
EN

Stack Overflow用户
提问于 2021-04-18 19:03:12
回答 1查看 1.9K关注 0票数 2

我试图找到一种方法来限制使用Python的异步httpx模块向服务器发出的并发HTTP请求的数量。我偶然发现了这个. StackOverflow

它建议使用asyncio.Semaphore来阻止多个消费者提出太多的请求。虽然这个答案工作得很好,但它使用的是显式循环构造,而不是asyncio.run。当我用asyncio.run替换显式循环构造时,代码的行为会发生变化。现在,它不再执行所有9个请求,而是执行三个请求,然后停止。

代码语言:javascript
运行
复制
import asyncio
from random import randint


async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))


sem = asyncio.Semaphore(3)


async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)


async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks, return_exceptions=True)  # await moment all downloads done


if __name__ ==  '__main__':
    asyncio.run(main())

这张打印出来:

代码语言:javascript
运行
复制
downloading 0 will take 3 second(s)
downloading 1 will take 1 second(s)
downloading 2 will take 3 second(s)
downloaded 1
downloaded 0
downloaded 2

我不得不将await asyncio.gather(*tasks)更改为await asyncio.gather(*tasks, return_exceptions=True),这样代码就不会抛出RuntimeError。否则它会抛出这个错误,我已经打开了异步调试模式。

代码语言:javascript
运行
复制
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
Traceback (most recent call last):
  File "/home/rednafi/workspace/personal/demo/demo.py", line 66, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/rednafi/workspace/personal/demo/demo.py", line 62, in main
    await asyncio.gather(*tasks)  # await moment all downloads done
  File "/home/rednafi/workspace/personal/demo/demo.py", line 52, in safe_download
    async with sem:  # semaphore limits num of simultaneous downloads
  File "/usr/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
    await self.acquire()
  File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-5' coro=<safe_download() running at /home/rednafi/workspace/personal/demo/demo.py:52> cb=[gather.<locals>._done_callback() at /usr/lib/python3.9/asyncio/tasks.py:764] created at /home/rednafi/workspace/personal/demo/demo.py:58> got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop

但是,唯一的其他更改是用asyncio.run替换显式循环。

问题是为什么代码的行为会改变?我怎样才能找回那些老生常谈的行为呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-18 19:44:55

问题是在顶层创建的Semaphore缓存事件循环在创建过程中是活动的(异步自动创建的事件循环,启动时由get_event_loop()返回)。另一方面,asyncio.run()在每次运行时创建一个新的事件循环。因此,您正在尝试等待来自不同事件循环的信号量,但失败。和往常一样,隐藏异常而不了解其原因只会导致进一步的问题。

要正确解决这个问题,您应该在asyncio.run()内部创建信号量。例如,最简单的修补程序可以如下所示:

代码语言:javascript
运行
复制
# ...
sem = None

async def main():
    global sem
    sem = asyncio.Semaphore(3)
    # ...

更优雅的方法是从顶层完全删除sem,并显式地将其传递给safe_download

代码语言:javascript
运行
复制
async def safe_download(i, limit):
    async with limit:
        return await download(i)

async def main():
    # limit parallel downloads to 3 at most
    limit = asyncio.Semaphore(3)
    # you don't need to explicitly call create_task() if you call
    # `gather()` because `gather()` will do it for you
    await asyncio.gather(*[safe_download(i, limit) for i in range(9)])
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67152371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档