首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >FastAPI WebSocket复制

FastAPI WebSocket复制
EN

Stack Overflow用户
提问于 2022-02-12 10:10:37
回答 1查看 1.2K关注 0票数 2

我用FastAPI实现了一个简单的FastAPI代理(使用这个例子)

应用程序的目标是将它获得的所有消息传递给它的活动连接(代理)。

它只适用于单个实例,因为它将活动的WebSocket连接保存在内存中。当有多个实例时,内存不会被共享。

我天真的方法是通过在某些共享存储(Redis)中保持活动连接来解决这个问题。但我被困在泡菜上了。

以下是完整的应用程序:

代码语言:javascript
运行
复制
import pickle

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis

app = FastAPI()
rds = redis.StrictRedis('localhost')

class ConnectionManager:
    def __init__(self):
        self.active_connections = defaultdict(dict)

    async def connect(self, websocket: WebSocket, application: str, client_id: str):
        await websocket.accept()
        if application not in self.active_connections:
            self.active_connections[application] = defaultdict(list)

        self.active_connections[application][client_id].append(websocket)

        #### this is my attempt to store connections ####
        rds.set('connections', pickle.dumps(self.active_connections)) 

    def disconnect(self, websocket: WebSocket, application: str, client_id: str):
        self.active_connections[application][client_id].remove(websocket)

    async def broadcast(self, message: dict, application: str, client_id: str):
        for connection in self.active_connections[application][client_id]:
            try:
                await connection.send_json(message)
                print(f"sent {message}")
            except Exception as e:
                pass


manager = ConnectionManager()


@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
    await manager.connect(websocket, application, client_id)
    while True:
        try:
            data = await websocket.receive_json()
            print(f"received: {data}")
            await manager.broadcast(data, application, client_id)
        except WebSocketDisconnect:
            manager.disconnect(websocket, application, client_id)
        except RuntimeError:
            break


if __name__ == '__main__':
    import uvicorn

    uvicorn.run(app, host='0.0.0.0', port=8005)

但是,酸洗websocket连接没有成功:

代码语言:javascript
运行
复制
AttributeError: Can't pickle local object 'FastAPI.setup.<locals>.openapi'

在应用程序实例中存储WebSocket连接的正确方法是什么?

UPD @AKX回答的实际解决方案。

服务器的每个实例都订阅Redis,并尝试将接收到的消息发送给其所有连接的客户端。

由于一个客户端无法连接到多个实例--每个消息只应传递给每个客户端一次。

代码语言:javascript
运行
复制
import json
import asyncio


from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis

app = FastAPI()
rds = redis.StrictRedis('localhost')


class ConnectionManager:
    def __init__(self):
        self.active_connections = defaultdict(dict)

    async def connect(self, websocket: WebSocket, application: str, client_id: str):
        await websocket.accept()
        if application not in self.active_connections:
            self.active_connections[application] = defaultdict(list)

        self.active_connections[application][client_id].append(websocket)

    def disconnect(self, websocket: WebSocket, application: str, client_id: str):
        self.active_connections[application][client_id].remove(websocket)

    async def broadcast(self, message: dict, application: str, client_id: str):
        for connection in self.active_connections[application][client_id]:
            try:
                await connection.send_json(message)
                print(f"sent {message}")
            except Exception as e:
                pass

    async def consume(self):
        print("started to consume")
        sub = rds.pubsub()
        sub.subscribe('channel')
        while True:
            await asyncio.sleep(0.01)
            message = sub.get_message(ignore_subscribe_messages=True)
            if message is not None and isinstance(message, dict):
                msg = json.loads(message.get('data'))
                await self.broadcast(msg['message'], msg['application'], msg['client_id'])


manager = ConnectionManager()


@app.on_event("startup")
async def subscribe():
    asyncio.create_task(manager.consume())


@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
    await manager.connect(websocket, application, client_id)
    while True:
        try:
            data = await websocket.receive_json()
            print(f"received: {data}")
            rds.publish(
                'channel',
                json.dumps({
                   'application': application,
                   'client_id': client_id,
                   'message': data
                })
            )
        except WebSocketDisconnect:
            manager.disconnect(websocket, application, client_id)
        except RuntimeError:
            break


if __name__ == '__main__':  # pragma: no cover
    import uvicorn

    uvicorn.run(app, host='0.0.0.0', port=8005)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-12 10:17:08

在应用程序实例中存储WebSocket连接的正确方法是什么?

据我所知,多个进程没有共享websocket连接的实用方法。正如您注意到的,您不能对连接进行分类(尤其是因为您不能对表示网络连接的实际OS级文件描述符进行筛选)。您可以使用POSIX魔术将文件描述符发送到其他进程,但即便如此,您也需要确保进程知道websocket的状态,而不需要在发送或接收数据时进行竞争。

我可能会重新设计一些东西,使之具有管理websocket连接的单个进程,例如使用Redis (因为您已经拥有它) 耻骨来与其他多个进程进行通信。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71090906

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档