首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么在anyio上运行多达40个任务?

为什么在anyio上运行多达40个任务?
EN

Stack Overflow用户
提问于 2021-11-21 06:09:51
回答 1查看 73关注 0票数 0
代码语言:javascript
运行
复制
import logging
import time
from datetime import datetime

import anyio
import numpy as np
from anyio.streams.memory import MemoryObjectReceiveStream as rstream
from anyio.streams.memory import MemoryObjectSendStream as sstream

logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)


async def ping_num(send: sstream):
    async with send:
        for num in range(100):
            await send.send(num)


async def pong_num(receive_num: rstream, send_other_num: sstream, key: int):
    async with receive_num, send_other_num:
        async for num in receive_num:
            await send_other_num.send((key, num, np.sqrt(num)))
        send_other_num.close()


async def async_sleep_5_and_print(receive_other: rstream):
    async with receive_other:
        async with anyio.create_task_group() as task_group:
            async for other in receive_other:
                task_group.start_soon(
                    anyio.to_thread.run_sync, sync_sleep_5_and_print, other
                )


def sync_sleep_5_and_print(item):
    logging.info(f"start:: {datetime.today()} {item=}")
    time.sleep(5)
    logging.info(f"  end:: {datetime.today()} {item=}")


async def main():
    send, receive = anyio.create_memory_object_stream()
    send_other, receive_other = anyio.create_memory_object_stream()
    async with anyio.create_task_group() as task_group:
        async with send:
            task_group.start_soon(ping_num, send.clone())
        async with receive, send_other:
            for key in range(5):
                task_group.start_soon(
                    pong_num, receive.clone(), send_other.clone(), key
                )
        async with receive_other:
            task_group.start_soon(async_sleep_5_and_print, receive_other.clone())
    logger.info("main end")


if __name__ == "__main__":
    anyio.run(main)

日志:

代码语言:javascript
运行
复制
INFO:root:start:: 2021-11-21 14:45:48.113164 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:48.117558 item=(1, 1, 1.0)
INFO:root:start:: 2021-11-21 14:45:48.122124 item=(2, 2, 1.4142135623730951)

...

INFO:root:start:: 2021-11-21 14:45:48.149377 item=(3, 38, 6.164414002968976)
INFO:root:start:: 2021-11-21 14:45:48.154694 item=(4, 39, 6.244997998398398)
INFO:root:  end:: 2021-11-21 14:45:53.115420 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:53.116359 item=(4, 99, 9.9498743710662)

...

预计100个任务将一起运行,并在大约5秒内结束,但它花了15秒。

从日志中可以看出,它似乎可以同时运行多达40个任务。

我将后端改为trio,但同样的问题也出现了。

为什么会发生这种情况?

在上面的代码中有没有解决这个问题的方法?

EN

Stack Overflow用户

发布于 2021-11-21 12:46:04

当我了解到更多关于anyio.to_thread.run_sync,的知识时,我发现它正在接收一个名为limiter的关键字参数。

如果接收到None值,则使用默认限制器。

大概,这个基本限制器被指定为40。

因此,我添加并修改了以下代码来实现我想要的操作。

可能有人和我有同样的问题,所以我把它留作答案。

代码语言:javascript
运行
复制
from functools import partial

async def async_sleep_5_and_print(receive_other: rstream):
    limit = anyio.CapacityLimiter(100)
    run_sync_with_limit = partial(anyio.to_thread.run_sync, limiter=limit)
    async with receive_other:
        async with anyio.create_task_group() as task_group:
            async for other in receive_other:
                task_group.start_soon(
                    run_sync_with_limit, sync_sleep_5_and_print, other
                )

日志:

代码语言:javascript
运行
复制
INFO:root:start:: 2021-11-21 21:40:24.235837 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 21:40:24.237306 item=(1, 1, 1.0)
INFO:root:start:: 2021-11-21 21:40:24.237800 item=(2, 2, 1.4142135623730951)
INFO:root:start:: 2021-11-21 21:40:24.238302 item=(3, 3, 1.7320508075688772)
INFO:root:start:: 2021-11-21 21:40:24.238695 item=(4, 4, 2.0)

...

INFO:root:  end:: 2021-11-21 21:40:29.302220 item=(4, 89, 9.433981132056603)
INFO:root:  end:: 2021-11-21 21:40:29.302260 item=(4, 94, 9.695359714832659)
INFO:root:  end:: 2021-11-21 21:40:29.303010 item=(0, 95, 9.746794344808963)
INFO:root:  end:: 2021-11-21 21:40:29.303086 item=(2, 97, 9.848857801796104)
INFO:__main__:main end

我找到了anyio._backends_._asyncio.current_default_thread_limiter

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70052250

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档