首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何向多个消费者广播异步StreamReader?

如何向多个消费者广播异步StreamReader?
EN

Stack Overflow用户
提问于 2020-05-12 13:52:52
回答 1查看 236关注 0票数 2

我正在尝试使用aiohttp来制作一种高级的反向代理。

我想获取HTTP请求的内容,并将其传递给新的HTTP请求,而不是将其拉入内存。虽然只有一个上游,但任务相当简单: aiohttp服务器将请求内容作为StreamReader返回,而aiohttp客户端可以接受StreamReader作为请求正文。

问题是我想要向多个上游发送回源请求,或者,例如,同时向上游发送内容并将其写入磁盘。

是否有一些工具来播放StreamReader的内容

我试过制作一些天真的广播器,但在大型对象上失败了。我做错了什么?

代码语言:javascript
运行
复制
class StreamBroadcast:
    async def __do_broadcast(self):
        while True:
            chunk = await self.__source.read(self.__n)
            if not chunk:
                break
            for output in self.__sinks:
                output.feed_data(chunk)
        for output in self.__sinks:
            output.feed_eof()

    def __init__(self, source: StreamReader, sinks_count: int, n: int = -1):
        self.__source = source
        self.__n = n
        self.__sinks = [StreamReader() for i in range(sinks_count)]
        self.__task = asyncio.create_task(self.__do_broadcast())

    @property
    def sinks(self) -> Iterable[StreamReader]:
        return self.__sinks

    @property
    def ready(self) -> Task:
        return self.__task
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-22 21:56:07

嗯,我查阅了asyncio源代码,发现我应该使用Transport在流上抽取数据。这是我的解决方案。

代码语言:javascript
运行
复制
import asyncio
from asyncio import StreamReader, StreamWriter, ReadTransport, StreamReaderProtocol
from typing import Iterable


class _BroadcastReadTransport(ReadTransport):
    """
    Internal class, is not meant to be instantiated manually
    """

    def __init__(self, source: StreamReader, sinks: Iterable[StreamReader]):
        super().__init__()
        self.__source = source
        self.__sinks = tuple(StreamReaderProtocol(s) for s in sinks)
        for sink in sinks:
            sink.set_transport(self)
        self.__waiting_for_data = len(self.__sinks)

        asyncio.create_task(self.__broadcast_next_chunk(), name='initial-chunk-broadcast')

    def is_reading(self):
        return self.__waiting_for_data == len(self.__sinks)

    def pause_reading(self):
        self.__waiting_for_data -= 1

    async def __broadcast_next_chunk(self):
        data = await self.__source.read()
        if data:
            for sink in self.__sinks:
                sink.data_received(data)
            if self.is_reading():
                asyncio.create_task(self.__broadcast_next_chunk())
        else:
            for sink in self.__sinks:
                sink.eof_received()

    def resume_reading(self):
        self.__waiting_for_data += 1
        if self.__waiting_for_data == len(self.__sinks):
            asyncio.create_task(self.__broadcast_next_chunk(), name='chunk-broadcast')

    @property
    def is_completed(self):
        return self.__source.at_eof()


class StreamBroadcast:
    def __init__(self, source: StreamReader, sinks_count: int):
        self.__source = source
        self.__sinks = tuple(StreamReader() for _ in range(sinks_count))
        self.__transport = _BroadcastReadTransport(self.__source, self.__sinks)

    @property
    def sinks(self) -> Iterable[StreamReader]:
        return self.__sinks

    @property
    def is_completed(self):
        return self.__transport.is_completed

希望有一次我会把它打包到pip模块中。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61744721

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档