首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

深入Python协程:一次千万级WebSocket服务优化实践

最近完成了一个大规模WebSocket服务器的优化工作,将单机连接数从1万提升到10万,集群总连接数达到千万级。今天分享这次优化的技术细节和实战经验。

一、原始架构痛点

最初的WebSocket服务存在的问题:

# 传统同步处理方式

class WebSocketServer:

  def __init__(self):

      self.clients = {}

  def handle_client(self, websocket):

      while True:

          message = websocket.recv()  # 阻塞操作

          self.process_message(message)

二、基于协程的重构方案

异步WebSocket服务器

import asyncio

from fastapi import FastAPI, WebSocket

from typing import Dict

class WebSocketManager:

  def __init__(self):

      self._active_connections: Dict[str, WebSocket] = {}

      self._message_queue = asyncio.Queue()

      self._connection_counter = 0

  asyncdef connect(self, client_id: str, websocket: WebSocket):

      await websocket.accept()

      self._active_connections[client_id] = websocket

      self._connection_counter += 1

      # 连接监控

      metrics.gauge(

          'websocket_connections',

          self._connection_counter

      )

  asyncdef disconnect(self, client_id: str):

      if client_id in self._active_connections:

          await self._active_connections[client_id].close()

          del self._active_connections[client_id]

          self._connection_counter -= 1

  asyncdef broadcast(self, message: dict):

      # 使用gather进行并发广播

      tasks = [

          self._safe_send(client, message)

          for client in self._active_connections.values()

      ]

      await asyncio.gather(*tasks, return_exceptions=True)

  asyncdef _safe_send(self, websocket: WebSocket, message: dict):

      try:

          await websocket.send_json(message)

      except Exception as e:

          logger.error(f"Failed to send message: {e}")

消息处理引擎

class MessageProcessor:

  def __init__(self):

      self.redis = aioredis.from_url(

          "redis://localhost",

          encoding="utf-8",

          decode_responses=True

      )

      self.batch_size = 1000

      self.processing_queue = asyncio.Queue()

  asyncdef start(self):

      # 启动多个消费者

      consumers = [

          asyncio.create_task(self._consume_messages())

          for _ in range(10)

      ]

      await asyncio.gather(*consumers)

  asyncdef _consume_messages(self):

      whileTrue:

          batch = []

          try:

              # 批量处理消息

              while len(batch) < self.batch_size:

                  message = await self.processing_queue.get()

                  batch.append(message)

                  if self.processing_queue.empty():

                      break

              await self._process_batch(batch)

          except Exception as e:

              logger.error(f"Error processing batch: {e}")

  asyncdef _process_batch(self, messages: list):

      asyncwith aiohttp.ClientSession() as session:

          tasks = [

              self._process_single_message(session, msg)

              for msg in messages

          ]

          await asyncio.gather(*tasks)

三、性能优化实践

内存优化

class MemoryOptimizedQueue:

  def __init__(self, maxsize=10000):

      self._queue = asyncio.Queue(maxsize=maxsize)

      self._memory_limit = 1024 * 1024 * 100# 100MB

  asyncdef put(self, item):

      current_memory = psutil.Process().memory_info().rss

      if current_memory > self._memory_limit:

          # 触发内存清理

          await self._cleanup()

      await self._queue.put(item)

  asyncdef _cleanup(self):

      # 清理策略实现

      whilenot self._queue.empty():

          try:

              self._queue.get_nowait()

          except asyncio.QueueEmpty:

              break

连接池优化

class ConnectionPoolManager:

  def __init__(self):

      self._pools = {}

      self._max_connections = 1000

      self._ttl = 300# 连接存活时间

  asyncdef get_connection(self, service: str):

      if service notin self._pools:

          self._pools[service] = await self._create_pool(service)

      returnawait self._pools[service].acquire()

  asyncdef _create_pool(self, service: str):

      returnawait aioredis.create_pool(

          f'redis://{service}',

          minsize=20,

          maxsize=self._max_connections,

          encoding='utf-8',

          decode_responses=True

      )

四、监控系统实现

性能指标收集

class WebSocketMetrics:

  def __init__(self):

      self.metrics = {

          'connection_count': Counter(

              'websocket_connections_total',

              'Total WebSocket connections'

          ),

          'message_latency': Histogram(

              'message_processing_seconds',

              'Message processing latency'

          ),

          'error_count': Counter(

              'websocket_errors_total',

              'Total WebSocket errors'

          )

      }

  asyncdef collect_metrics(self):

      whileTrue:

          for metric in self.metrics.values():

              await metric.collect()

          await asyncio.sleep(10)

健康检查

class HealthCheck:

  def __init__(self, ws_manager):

      self.ws_manager = ws_manager

  asyncdef check_health(self):

      whileTrue:

          try:

              stats = {

                  'connections': len(self.ws_manager._active_connections),

                  'memory_usage': psutil.Process().memory_info().rss,

                  'cpu_usage': psutil.Process().cpu_percent()

              }

              if self._should_alert(stats):

                  await self._send_alert(stats)

          except Exception as e:

              logger.error(f"Health check failed: {e}")

          await asyncio.sleep(60)

五、优化成果

性能提升:

单机连接数:1万 10万

消息处理延迟:50ms 5ms

内存使用:优化40%

稳定性提升:

系统运行时间:3天 30天

错误率:1% 0.01%

六、经验总结

优化原则:

使用异步IO

批量处理

资源池化

监控先行

实践建议:

合理使用协程

注意内存管理

实时监控告警

优雅降级处理

这次优化实践不仅提升了系统性能,也加深了对Python协程和异步编程的理解。记住,在处理高并发场景时,协程是一个强大的工具,但需要合理使用。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OkW1vkWbHAUfFfDZ5ddkRuow0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券