最近完成了一个大规模WebSocket服务器的优化工作,将单机连接数从1万提升到10万,集群总连接数达到千万级。今天分享这次优化的技术细节和实战经验。
一、原始架构痛点
最初的WebSocket服务存在的问题:
# 传统同步处理方式
class WebSocketServer:
def __init__(self):
self.clients = {}
def handle_client(self, websocket):
while True:
message = websocket.recv() # 阻塞操作
self.process_message(message)
二、基于协程的重构方案
异步WebSocket服务器
import asyncio
from fastapi import FastAPI, WebSocket
from typing import Dict
class WebSocketManager:
def __init__(self):
self._active_connections: Dict[str, WebSocket] = {}
self._message_queue = asyncio.Queue()
self._connection_counter = 0
asyncdef connect(self, client_id: str, websocket: WebSocket):
await websocket.accept()
self._active_connections[client_id] = websocket
self._connection_counter += 1
# 连接监控
metrics.gauge(
'websocket_connections',
self._connection_counter
)
asyncdef disconnect(self, client_id: str):
if client_id in self._active_connections:
await self._active_connections[client_id].close()
del self._active_connections[client_id]
self._connection_counter -= 1
asyncdef broadcast(self, message: dict):
# 使用gather进行并发广播
tasks = [
self._safe_send(client, message)
for client in self._active_connections.values()
]
await asyncio.gather(*tasks, return_exceptions=True)
asyncdef _safe_send(self, websocket: WebSocket, message: dict):
try:
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
消息处理引擎
class MessageProcessor:
def __init__(self):
self.redis = aioredis.from_url(
"redis://localhost",
encoding="utf-8",
decode_responses=True
)
self.batch_size = 1000
self.processing_queue = asyncio.Queue()
asyncdef start(self):
# 启动多个消费者
consumers = [
asyncio.create_task(self._consume_messages())
for _ in range(10)
]
await asyncio.gather(*consumers)
asyncdef _consume_messages(self):
whileTrue:
batch = []
try:
# 批量处理消息
while len(batch) < self.batch_size:
message = await self.processing_queue.get()
batch.append(message)
if self.processing_queue.empty():
break
await self._process_batch(batch)
except Exception as e:
logger.error(f"Error processing batch: {e}")
asyncdef _process_batch(self, messages: list):
asyncwith aiohttp.ClientSession() as session:
tasks = [
self._process_single_message(session, msg)
for msg in messages
]
await asyncio.gather(*tasks)
三、性能优化实践
内存优化
class MemoryOptimizedQueue:
def __init__(self, maxsize=10000):
self._queue = asyncio.Queue(maxsize=maxsize)
self._memory_limit = 1024 * 1024 * 100# 100MB
asyncdef put(self, item):
current_memory = psutil.Process().memory_info().rss
if current_memory > self._memory_limit:
# 触发内存清理
await self._cleanup()
await self._queue.put(item)
asyncdef _cleanup(self):
# 清理策略实现
whilenot self._queue.empty():
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
break
连接池优化
class ConnectionPoolManager:
def __init__(self):
self._pools = {}
self._max_connections = 1000
self._ttl = 300# 连接存活时间
asyncdef get_connection(self, service: str):
if service notin self._pools:
self._pools[service] = await self._create_pool(service)
returnawait self._pools[service].acquire()
asyncdef _create_pool(self, service: str):
returnawait aioredis.create_pool(
f'redis://{service}',
minsize=20,
maxsize=self._max_connections,
encoding='utf-8',
decode_responses=True
)
四、监控系统实现
性能指标收集
class WebSocketMetrics:
def __init__(self):
self.metrics = {
'connection_count': Counter(
'websocket_connections_total',
'Total WebSocket connections'
),
'message_latency': Histogram(
'message_processing_seconds',
'Message processing latency'
),
'error_count': Counter(
'websocket_errors_total',
'Total WebSocket errors'
)
}
asyncdef collect_metrics(self):
whileTrue:
for metric in self.metrics.values():
await metric.collect()
await asyncio.sleep(10)
健康检查
class HealthCheck:
def __init__(self, ws_manager):
self.ws_manager = ws_manager
asyncdef check_health(self):
whileTrue:
try:
stats = {
'connections': len(self.ws_manager._active_connections),
'memory_usage': psutil.Process().memory_info().rss,
'cpu_usage': psutil.Process().cpu_percent()
}
if self._should_alert(stats):
await self._send_alert(stats)
except Exception as e:
logger.error(f"Health check failed: {e}")
await asyncio.sleep(60)
五、优化成果
性能提升:
单机连接数:1万 10万
消息处理延迟:50ms 5ms
内存使用:优化40%
稳定性提升:
系统运行时间:3天 30天
错误率:1% 0.01%
六、经验总结
优化原则:
使用异步IO
批量处理
资源池化
监控先行
实践建议:
合理使用协程
注意内存管理
实时监控告警
优雅降级处理
这次优化实践不仅提升了系统性能,也加深了对Python协程和异步编程的理解。记住,在处理高并发场景时,协程是一个强大的工具,但需要合理使用。
领取专属 10元无门槛券
私享最新 技术干货