我一直在与服务器发送事件一起工作,以便只向某些客户端发送某些类型的通知。我正在使用名为starlette的模块来尝试实现这一目标。我对FastApi相当陌生,所以我无法弄清楚如何只将数据发送到特定的客户端,而不是向每个人广播。
到目前为止,我是这么想的:
使用查询param订阅请求
localhost:8000/subscribe?id=1
from sse_starlette.sse import EventSourceResponse
class EmitEventModel(BaseModel):
event_name: str
event_data: Optional[str] = "No Event Data"
event_id: Optional[int] = None
recipient_id: str
async def connection_established():
yield dict(data="Connection established")
clients = {}
@app.get("/subscribe")
async def loopBackStream(req: Request, id: str = ""):
clients[id] = EventSourceResponse(connection_established())
return clients[id]
@app.post("/emit")
async def emitEvent(event: EmitEventModel):
if clients[event.recipient_id]:
clients[event.recipient_id](publish_event())
每当有一个对localhost:8000/emit
的api调用包含主体时,基于recipient_id,事件将被路由。当然到目前为止还不起作用。对于应该做些什么来实现这一点,有什么建议吗?
sse_starlette供参考:https://github.com/sysid/sse-starlette/blob/master/sse_starlette/sse.py
发布于 2022-04-23 04:54:40
这里的想法是,您需要识别SSE生成器上的recipient_id
。我稍微修改了您的代码,以便能够显示我的意思:
from __future__ import annotations
import asyncio
import itertools
from collections import defaultdict
from fastapi import Request, FastAPI
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
clients = defaultdict(list)
class EmitEventModel(BaseModel):
event_name: str
event_data: Optional[str] = "No Event Data"
event_id: Optional[int] = None
recipient_id: str
async def retrieve_events(recipient_id: str) -> NoReturn:
yield dict(data="Connection established")
while True:
if recipient_id in clients and len(clients[recipient_id]) > 0:
yield clients[recipient_id].pop()
await asyncio.sleep(1)
print(clients)
@app.get("/subscribe/{recipient_id}")
async def loopBackStream(req: Request, recipient_id: str):
return EventSourceResponse(retrieve_events(recipient_id))
@app.post("/emit")
async def emitEvent(event: EmitEventModel):
clients[event.recipient_id].append(event)
https://stackoverflow.com/questions/71472890
复制相似问题