首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用FastApi中的服务器发送事件向某些客户端发送通知?

使用FastApi中的服务器发送事件向某些客户端发送通知?
EN

Stack Overflow用户
提问于 2022-03-14 18:56:00
回答 1查看 1.1K关注 0票数 0

我一直在与服务器发送事件一起工作,以便只向某些客户端发送某些类型的通知。我正在使用名为starlette的模块来尝试实现这一目标。我对FastApi相当陌生,所以我无法弄清楚如何只将数据发送到特定的客户端,而不是向每个人广播。

到目前为止,我是这么想的:

使用查询param订阅请求

代码语言:javascript
运行
复制
localhost:8000/subscribe?id=1
代码语言:javascript
运行
复制
from sse_starlette.sse import EventSourceResponse


class EmitEventModel(BaseModel):
    event_name: str
    event_data: Optional[str] = "No Event Data"
    event_id: Optional[int] = None
    recipient_id: str

async def connection_established():
    yield dict(data="Connection established")

clients = {}


@app.get("/subscribe")
async def loopBackStream(req: Request, id: str = ""):
    clients[id] = EventSourceResponse(connection_established())
    return clients[id]


@app.post("/emit")
async def emitEvent(event: EmitEventModel):
    if clients[event.recipient_id]:
        clients[event.recipient_id](publish_event())

每当有一个对localhost:8000/emit的api调用包含主体时,基于recipient_id,事件将被路由。当然到目前为止还不起作用。对于应该做些什么来实现这一点,有什么建议吗?

sse_starlette供参考:https://github.com/sysid/sse-starlette/blob/master/sse_starlette/sse.py

EN

Stack Overflow用户

发布于 2022-04-23 04:54:40

这里的想法是,您需要识别SSE生成器上的recipient_id。我稍微修改了您的代码,以便能够显示我的意思:

代码语言:javascript
运行
复制
from __future__ import annotations 

import asyncio
import itertools
from collections import defaultdict

from fastapi import Request, FastAPI
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse


app = FastAPI()
clients = defaultdict(list)


class EmitEventModel(BaseModel):
    event_name: str
    event_data: Optional[str] = "No Event Data"
    event_id: Optional[int] = None
    recipient_id: str


async def retrieve_events(recipient_id: str) -> NoReturn:
    yield dict(data="Connection established")
    while True:
        if recipient_id in clients and len(clients[recipient_id]) > 0:
            yield clients[recipient_id].pop()
        await asyncio.sleep(1)
        print(clients)
        

@app.get("/subscribe/{recipient_id}")
async def loopBackStream(req: Request, recipient_id: str):
    return EventSourceResponse(retrieve_events(recipient_id))


@app.post("/emit")
async def emitEvent(event: EmitEventModel):
    clients[event.recipient_id].append(event)
票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71472890

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档