在当今数据密集型应用和大模型部署的时代,批量推理已成为提升系统性能和资源利用率的关键技术。随着深度学习模型规模的不断扩大和应用场景的日益复杂,如何高效地处理大量推理请求成为技术团队面临的重要挑战。传统的同步API调用方式在面对高并发、大规模数据处理时,往往会遇到响应延迟高、资源利用不充分等问题。异步API调用作为一种更高效的处理模式,通过非阻塞操作和并发处理能力,为批量推理场景提供了理想的解决方案。
本文将深入探讨批量推理中异步API调用的核心概念、实现技术、优化策略以及最佳实践。我们将从异步编程的基础原理出发,详细介绍Python中asyncio和aiohttp的使用方法,分析批量请求的构建与管理策略,探讨重试机制的设计与实现,并通过实际案例展示异步API调用在各类应用场景中的优势。此外,我们还将讨论性能优化技术、错误处理策略以及未来发展趋势,为读者提供全面而实用的批量推理异步处理指南。
在批量推理场景中,传统的同步API调用方式存在以下局限性:
相比之下,异步API调用具有显著优势:
批量推理异步API调用适用于以下场景:
通过采用异步API调用,企业可以显著降低推理服务的延迟,提高系统吞吐量,优化资源利用率,从而提升用户体验并降低运营成本。
要理解异步API调用,首先需要明确同步和异步编程的本质区别。
**同步编程(Synchronous Programming)**是一种传统的编程模型,其中操作按照顺序依次执行,每个操作必须等待前一个操作完成后才能开始。在同步编程中,当程序执行到一个需要等待的操作(如网络请求、文件IO)时,整个程序会被阻塞,直到该操作完成。
任务1执行 → 等待任务1完成 → 任务2执行 → 等待任务2完成 → 任务3执行**异步编程(Asynchronous Programming)**则是一种非阻塞的编程模型,允许程序在等待某些操作完成的同时继续执行其他任务。当遇到需要等待的操作时,程序不会被阻塞,而是注册一个回调函数,当操作完成时通过事件循环触发回调函数的执行。
任务1开始 → 任务2开始 → 任务3开始 → 任务1完成 → 任务2完成 → 任务3完成协程(Coroutine)是实现异步编程的核心概念,它是一种可以暂停执行并在稍后恢复的函数。在Python中,协程使用async/await语法来定义和使用。
事件循环(Event Loop)是异步编程的中央调度器,负责管理协程的执行、处理IO事件、调度回调函数等。事件循环维护一个任务队列,按照一定的优先级和调度策略执行任务。
下面是协程和事件循环的基本工作流程:
await表达式时,协程暂停执行,控制权返回给事件循环await的操作完成时,协程重新加入到可执行队列Python提供了丰富的异步编程工具,其中最核心的是asyncio库和基于它的各种框架。
asyncio是Python 3.4引入的标准库,提供了完整的异步编程支持,包括事件循环、协程、任务、Future等组件。
aiohttp是基于asyncio的异步HTTP客户端/服务器框架,提供了高性能的异步HTTP请求功能。
httpx是另一个现代化的异步HTTP客户端,支持同步和异步两种API风格。
trio是一个更现代的异步编程库,提供了更安全、更易用的异步编程模型。
aiohttp是Python中最流行的异步HTTP客户端库之一,它基于asyncio构建,提供了高性能的异步HTTP请求功能。aiohttp不仅支持客户端操作,还支持服务器端开发,是构建异步Web应用的理想选择。
安装aiohttp非常简单,可以通过pip命令安装:
pip install aiohttp对于批量推理场景,我们可能还需要安装一些额外的依赖包:
pip install aiohttp[speedups] # 安装可选依赖,提升性能
pip install async_timeout # 用于设置超时aiohttp的核心组件包括ClientSession、ClientRequest、ClientResponse等:
ClientSession使用连接池来复用TCP连接,这对于批量API调用非常重要,可以显著减少连接建立和断开的开销。默认情况下,aiohttp会为每个域名维护一个连接池,最大连接数为100。
下面是使用aiohttp进行异步HTTP请求的基本示例:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'https://example.com')
print(html)
if __name__ == '__main__':
asyncio.run(main())在这个示例中,我们创建了一个ClientSession对象,然后使用它发送GET请求。async with语句确保会话在使用完毕后正确关闭,避免资源泄露。await表达式用于等待异步操作完成。
httpx是一个更现代的异步HTTP客户端,提供了与requests类似的API,但支持异步操作。httpx的主要优势包括:
下面是使用httpx进行异步HTTP请求的示例:
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
response = await client.get('https://example.com')
print(response.text)
asyncio.run(main())特性 | aiohttp | httpx |
|---|---|---|
异步支持 | 是 | 是 |
同步支持 | 否 | 是 |
HTTP/2支持 | 是 | 是 |
连接池 | 是 | 是 |
文档质量 | 好 | 优秀 |
社区活跃度 | 高 | 中高 |
性能 | 高 | 高 |
API设计 | 独特 | 类似requests |
对于批量推理场景,两种库都能满足需求,但选择哪种库取决于具体的项目需求和团队熟悉程度。如果团队已经熟悉requests,那么httpx可能更容易上手;如果追求极致性能,aiohttp可能是更好的选择。
在批量推理场景中,如何有效地组织和管理请求是关键。以下是几种常用的任务批处理策略:
批量大小是指一次同时发送的请求数量。设置合适的批量大小对于性能优化至关重要:
一般来说,可以根据以下因素确定合适的批量大小:
对于大多数场景,批量大小可以设置为50-200之间,然后通过性能测试进行调优。
速率限制是控制API调用频率的重要机制,可以避免对服务器造成过大压力,也可以避免触发API提供商的限流策略。
常用的速率限制实现方式包括:
在异步编程中,可以使用asyncio.sleep()来实现简单的速率限制:
async def rate_limited_request(session, url, rate_limit=10):
# 每1/rate_limit秒发送一个请求
await asyncio.sleep(1/rate_limit)
async with session.get(url) as response:
return await response.text()对于更复杂的速率限制需求,可以使用专门的库,如aiolimiter:
from aiolimiter import AsyncLimiter
# 创建一个限流器,每秒最多10个请求
limiter = AsyncLimiter(10, 1)
async def limited_request(session, url):
async with limiter:
async with session.get(url) as response:
return await response.text()在某些场景中,不同的推理请求可能有不同的优先级。例如,实时用户请求可能比批量处理任务更紧急。在这种情况下,可以使用优先级队列来管理请求:
import asyncio
import heapq
class PriorityQueue:
def __init__(self):
self._queue = []
self._index = 0
async def put(self, item, priority):
# 优先级越低,越先执行
heapq.heappush(self._queue, (priority, self._index, item))
self._index += 1
async def get(self):
return heapq.heappop(self._queue)[-1]
async def process_priority_requests(queue):
while True:
request = await queue.get()
# 处理请求
await process_request(request)
queue.task_done()对于大规模的推理任务,通常需要将请求分成多个批次进行处理。以下是几种常用的分组策略:
根据请求的数据量或复杂程度进行分组,确保每个批次的总处理时间大致相同:
def group_by_complexity(requests, complexity_threshold=100):
groups = []
current_group = []
current_complexity = 0
for request in requests:
request_complexity = calculate_complexity(request)
if current_complexity + request_complexity > complexity_threshold:
groups.append(current_group)
current_group = [request]
current_complexity = request_complexity
else:
current_group.append(request)
current_complexity += request_complexity
if current_group:
groups.append(current_group)
return groups根据请求到达的时间进行分组,定期批量处理积累的请求:
import asyncio
from collections import deque
class TimeBasedBatcher:
def __init__(self, batch_interval=0.1, max_batch_size=100):
self.queue = deque()
self.batch_interval = batch_interval
self.max_batch_size = max_batch_size
self.lock = asyncio.Lock()
async def add(self, request):
async with self.lock:
self.queue.append(request)
# 如果队列达到最大批量大小,立即处理
if len(self.queue) >= self.max_batch_size:
return await self.process_batch()
async def process_batch(self):
async with self.lock:
if not self.queue:
return []
batch = list(self.queue)
self.queue.clear()
return await process_requests(batch)
async def start(self):
while True:
await asyncio.sleep(self.batch_interval)
await self.process_batch()连接池是提升批量API调用性能的关键组件,合理配置连接池参数可以显著提高请求吞吐量。
对于aiohttp,主要的连接池配置参数包括:
以下是配置aiohttp连接池的示例:
import aiohttp
connector = aiohttp.TCPConnector(
limit=100, # 最大连接数
limit_per_host=50, # 每个主机的最大连接数
keepalive_timeout=30, # 连接保持活跃的时间(秒)
force_close=False # 会话关闭时不强制关闭连接
)
async with aiohttp.ClientSession(connector=connector) as session:
# 使用会话发送请求
pass连接复用是提高性能的重要手段,以下是一些优化连接复用的策略:
DNS解析也是API调用延迟的一个重要来源,启用DNS缓存可以减少DNS解析的开销:
connector = aiohttp.TCPConnector(
enable_cleanup_closed=True, # 清理已关闭的连接
ttl_dns_cache=300, # DNS缓存的生存时间(秒)
)在批量API调用中,网络抖动、服务器临时不可用等问题时有发生,重试机制是保证系统鲁棒性的重要手段。
常用的重试策略包括:
指数退避是最常用的重试策略之一,它可以有效避免重试风暴,给服务器留出恢复的时间。以下是指数退避算法的实现示例:
import asyncio
import random
async def exponential_backoff_retry(func, max_retries=3, base_delay=0.1, max_delay=10):
retries = 0
while True:
try:
return await func()
except Exception as e:
retries += 1
if retries > max_retries:
raise Exception(f"Maximum retries ({max_retries}) exceeded") from e
# 计算退避时间:base_delay * (2^(retries-1)) + 随机抖动
delay = min(base_delay * (2 ** (retries - 1)), max_delay)
# 添加随机抖动,避免多个客户端同时重试
jitter = random.uniform(0.8, 1.2)
actual_delay = delay * jitter
print(f"Attempt {retries} failed, retrying in {actual_delay:.2f} seconds...")
await asyncio.sleep(actual_delay)不是所有的错误都适合重试,例如,对于404 Not Found错误,重试通常是没有意义的。因此,需要根据错误类型决定是否重试:
import aiohttp
async def selective_retry(session, url, max_retries=3):
retryable_status_codes = {500, 502, 503, 504, 429}
retries = 0
while True:
try:
async with session.get(url) as response:
if response.status in retryable_status_codes:
retries += 1
if retries > max_retries:
response.raise_for_status()
delay = 0.1 * (2 ** (retries - 1))
print(f"Received status {response.status}, retrying in {delay} seconds...")
await asyncio.sleep(delay)
continue
response.raise_for_status()
return await response.text()
except aiohttp.ClientConnectorError:
# 连接错误也可以重试
retries += 1
if retries > max_retries:
raise
delay = 0.1 * (2 ** (retries - 1))
print(f"Connection error, retrying in {delay} seconds...")
await asyncio.sleep(delay)断路器模式是一种更高级的容错机制,它可以在检测到服务持续不可用时,暂时停止对该服务的请求,避免资源浪费。以下是断路器模式的简单实现:
import asyncio
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=30):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.lock = asyncio.Lock()
async def execute(self, func):
async with self.lock:
# 检查断路器状态
if self.state == CircuitState.OPEN:
# 检查是否可以尝试半开状态
if (asyncio.get_event_loop().time() - self.last_failure_time >
self.reset_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = await func()
async with self.lock:
# 成功执行,重置状态
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
async with self.lock:
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if (self.state == CircuitState.CLOSED and
self.failure_count >= self.failure_threshold):
self.state = CircuitState.OPEN
elif self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.failure_count = self.failure_threshold
raise在批量API调用中,良好的错误处理机制可以确保系统的稳定性和可靠性。以下是一些关键的错误处理策略:
在异步HTTP请求中,常见的异常包括:
不同类型的异常需要不同的处理策略。例如,连接异常和超时异常通常可以重试,而400 Bad Request等客户端错误则通常不需要重试。
超时控制是错误处理的重要组成部分,可以避免请求无限期等待。在aiohttp中,可以为每个请求设置超时:
async def request_with_timeout(session, url, timeout=10):
try:
async with session.get(url, timeout=timeout) as response:
return await response.text()
except asyncio.TimeoutError:
print(f"Request to {url} timed out after {timeout} seconds")
raise对于更精细的超时控制,可以使用async_timeout库:
import async_timeout
async def request_with_async_timeout(session, url, timeout=10):
try:
async with async_timeout.timeout(timeout):
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
print(f"Request to {url} timed out after {timeout} seconds")
raise详细的错误日志对于问题排查和系统监控至关重要。以下是一个日志记录的示例:
import logging
import traceback
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def request_with_logging(session, url, request_id):
try:
logger.info(f"Starting request {request_id}: {url}")
start_time = asyncio.get_event_loop().time()
async with session.get(url) as response:
content = await response.text()
duration = asyncio.get_event_loop().time() - start_time
logger.info(f"Request {request_id} completed in {duration:.2f}s with status {response.status}")
return content
except Exception as e:
logger.error(f"Request {request_id} failed: {str(e)}")
logger.debug(traceback.format_exc())
raise在批量处理中,通常需要一个全局的异常处理机制,确保即使部分请求失败,整个批处理过程也能继续进行:
async def process_batch_with_exception_handling(requests):
results = []
errors = []
async with aiohttp.ClientSession() as session:
tasks = [process_single_request(session, request) for request in requests]
# 使用gather而不是wait,可以获取所有任务的结果
all_results = await asyncio.gather(
*tasks,
return_exceptions=True # 让异常不中断其他任务
)
for i, result in enumerate(all_results):
if isinstance(result, Exception):
errors.append((i, result))
else:
results.append(result)
return results, errors并发控制是优化批量API调用性能的关键。以下是一些常用的并发控制策略:
使用信号量可以限制同时运行的协程数量,避免并发过多导致系统资源耗尽:
async def process_with_semaphore(requests, max_concurrency=50):
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded_request(request):
async with semaphore:
return await process_request(request)
tasks = [bounded_request(request) for request in requests]
return await asyncio.gather(*tasks)对于大量请求,可以将其分成多个批次,逐批处理:
async def batch_process(requests, batch_size=100, max_concurrency=50):
results = []
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded_request(request):
async with semaphore:
return await process_request(request)
for i in range(0, len(requests), batch_size):
batch = requests[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1}/{(len(requests)+batch_size-1)//batch_size}")
tasks = [bounded_request(request) for request in batch]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
return results在处理大量请求时,内存管理也是一个重要的考虑因素:
以下是流式处理响应的示例:
async def stream_processing(session, url):
async with session.get(url) as response:
async for chunk in response.content.iter_any():
# 处理每个数据块
yield process_chunk(chunk)启用压缩可以减少网络传输的数据量,提高请求速度:
connector = aiohttp.TCPConnector(force_close=True)
headers = {'Accept-Encoding': 'gzip, deflate, br'}
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
async with session.get(url) as response:
# aiohttp会自动处理压缩
content = await response.text()对于重复的请求,可以使用缓存来避免不必要的网络调用:
from functools import lru_cache
# 注意:这只适用于同步函数,对于异步函数需要使用专门的缓存装饰器
@lru_cache(maxsize=1000)
def get_cached_data(key):
# 同步获取数据的函数
pass
# 对于异步函数,可以使用 aiocache 库
from aiocache import cached
from aiocache.serializers import JsonSerializer
@cached(ttl=3600, serializer=JsonSerializer())
async def get_cached_async_data(key):
# 异步获取数据的函数
pass某公司部署了一个大型语言模型服务,需要处理来自多个业务系统的批量推理请求。每个请求包含一个文本输入,模型需要生成相应的输出。服务需要支持高并发,并且对响应时间有严格要求。
客户端 → API网关 → 异步处理服务 → 模型服务 → 异步处理服务 → API网关 → 客户端import asyncio
import aiohttp
import logging
from typing import List, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncInferenceService:
def __init__(self, model_url: str, max_concurrency: int = 100):
self.model_url = model_url
self.semaphore = asyncio.Semaphore(max_concurrency)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _process_single_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
async with self.semaphore:
retry_count = 0
max_retries = 3
while retry_count <= max_retries:
try:
async with self.session.post(
self.model_url,
json=request_data,
timeout=30
) as response:
if response.status == 200:
return await response.json()
elif response.status in (500, 502, 503, 504, 429):
retry_count += 1
if retry_count > max_retries:
raise Exception(f"Request failed with status {response.status}")
# 指数退避
delay = 0.1 * (2 ** (retry_count - 1))
logger.warning(f"Request failed, retrying in {delay}s...")
await asyncio.sleep(delay)
else:
response.raise_for_status()
except asyncio.TimeoutError:
retry_count += 1
if retry_count > max_retries:
raise
delay = 0.1 * (2 ** (retry_count - 1))
logger.warning(f"Request timed out, retrying in {delay}s...")
await asyncio.sleep(delay)
async def process_batch(self, batch_data: List[Dict[str, Any]], batch_size: int = 50) -> List[Dict[str, Any]]:
results = []
# 分批处理
for i in range(0, len(batch_data), batch_size):
current_batch = batch_data[i:i+batch_size]
logger.info(f"Processing batch {i//batch_size + 1}/{(len(batch_data)+batch_size-1)//batch_size}")
tasks = [self._process_single_request(item) for item in current_batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果和异常
for j, result in enumerate(batch_results):
if isinstance(result, Exception):
logger.error(f"Request {i+j} failed: {str(result)}")
results.append({"error": str(result)})
else:
results.append(result)
return results
async def main():
# 创建测试数据
batch_data = [
{"text": f"Sample text {i}", "parameters": {"max_tokens": 100}}
for i in range(1000)
]
async with AsyncInferenceService("http://localhost:8000/inference") as service:
results = await service.process_batch(batch_data)
print(f"Processed {len(results)} requests")
if __name__ == "__main__":
asyncio.run(main())处理方式 | 请求数量 | 总耗时(秒) | 吞吐量(请求/秒) |
|---|---|---|---|
同步处理 | 1000 | 120 | 8.3 |
异步处理 | 1000 | 15 | 66.7 |
从上面的对比可以看出,异步处理方式的性能显著优于同步处理方式,吞吐量提高了约8倍。
某社交媒体平台需要对用户上传的大量图片进行内容审核,包括人脸识别、敏感内容检测等。系统需要高效地处理这些图片,确保及时完成审核。
import asyncio
import aiohttp
import base64
import os
from typing import List, Dict, Any
class ImageRecognitionService:
def __init__(self, api_key: str, max_concurrency: int = 50):
self.api_key = api_key
self.base_url = "https://api.image-recognition.example.com/v1/analyze"
self.semaphore = asyncio.Semaphore(max_concurrency)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
})
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _encode_image(self, image_path: str) -> str:
# 异步读取文件内容(在实际应用中可能需要使用更高效的方法)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: base64.b64encode(open(image_path, 'rb').read()).decode('utf-8')
)
async def _process_single_image(self, image_path: str) -> Dict[str, Any]:
async with self.semaphore:
try:
# 编码图片
image_data = await self._encode_image(image_path)
# 发送请求
payload = {
"image": image_data,
"features": ["face_detection", "content_moderation"]
}
async with self.session.post(
self.base_url,
json=payload,
timeout=60
) as response:
response.raise_for_status()
return await response.json()
except Exception as e:
return {"error": str(e), "path": image_path}
async def process_images(self, image_paths: List[str]) -> List[Dict[str, Any]]:
tasks = [self._process_single_image(path) for path in image_paths]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
# 获取所有图片路径
image_dir = "/path/to/images"
image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith((".jpg", ".jpeg", ".png"))]
# 处理图片
async with ImageRecognitionService(api_key="your-api-key") as service:
results = await service.process_images(image_paths[:100]) # 处理前100张图片
# 统计结果
success_count = sum(1 for r in results if not isinstance(r, Exception) and "error" not in r)
error_count = len(results) - success_count
print(f"Processed {len(results)} images: {success_count} successful, {error_count} errors")
if __name__ == "__main__":
asyncio.run(main())某电商平台需要为用户提供实时商品推荐,推荐系统需要根据用户的浏览历史、购买记录等信息,调用推荐模型API为多个用户同时生成推荐结果。
import asyncio
import aiohttp
import json
from typing import List, Dict, Any
class RecommendationService:
def __init__(self, api_url: str, max_concurrency: int = 100, rate_limit: int = 50):
self.api_url = api_url
self.semaphore = asyncio.Semaphore(max_concurrency)
# 使用令牌桶算法实现速率限制
self.tokens = asyncio.Queue()
self.rate_limit = rate_limit
self.rate_limit_task = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
# 启动速率限制任务
self.rate_limit_task = asyncio.create_task(self._fill_tokens())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.rate_limit_task.cancel()
try:
await self.rate_limit_task
except asyncio.CancelledError:
pass
await self.session.close()
async def _fill_tokens(self):
"""定期填充令牌桶"""
while True:
# 每秒生成rate_limit个令牌
for _ in range(self.rate_limit):
await self.tokens.put(None)
await asyncio.sleep(1)
async def _process_single_user(self, user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any]:
# 获取令牌
await self.tokens.get()
async with self.semaphore:
try:
async with self.session.post(
f"{self.api_url}/recommend",
json={
"user_id": user_id,
"user_data": user_data,
"num_recommendations": 10
},
timeout=10
) as response:
response.raise_for_status()
return await response.json()
except Exception as e:
return {"error": str(e), "user_id": user_id}
finally:
# 释放令牌
self.tokens.task_done()
async def batch_recommend(self, user_data_map: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
# 创建任务
tasks = {}
for user_id, user_data in user_data_map.items():
tasks[user_id] = self._process_single_user(user_id, user_data)
# 并发执行所有任务
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# 组装结果
final_results = {}
for user_id, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
final_results[user_id] = {"error": str(result)}
else:
final_results[user_id] = result
return final_results
async def main():
# 准备用户数据
user_data_map = {
f"user_{i}": {
"browse_history": [f"product_{j}" for j in range(i*10, i*10+5)],
"purchase_history": [f"product_{j}" for j in range(i*20, i*20+2)],
"preferences": {"category": f"category_{i % 5}"}
}
for i in range(200)
}
# 批量推荐
async with RecommendationService("http://localhost:8001", rate_limit=50) as service:
results = await service.batch_recommend(user_data_map)
# 统计结果
success_count = sum(1 for r in results.values() if "error" not in r)
error_count = len(results) - success_count
print(f"Generated recommendations for {success_count} users, {error_count} errors")
if __name__ == "__main__":
asyncio.run(main())将异步API调用服务容器化是一种常见的部署方式,可以提高服务的可移植性和可扩展性。以下是使用Docker容器化部署的示例:
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制并安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONUNBUFFERED=1
# 运行应用
CMD ["python", "app.py"]aiohttp>=3.9.0
async_timeout>=4.0.3
aiolimiter>=1.1.0version: '3.8'
services:
inference-service:
build: .
restart: always
ports:
- "8000:8000"
environment:
- MODEL_API_URL=http://model-service:8080
- MAX_CONCURRENCY=100
- MAX_RETRIES=3
deploy:
resources:
limits:
cpus: '2'
memory: 2G
depends_on:
- model-service
model-service:
image: your-model-image:latest
restart: always
expose:
- "8080"
deploy:
resources:
limits:
cpus: '4'
memory: 8G对于生产环境中的异步API调用服务,良好的监控是确保服务稳定性和性能的关键。以下是一些重要的监控指标:
可以使用Prometheus和Grafana来构建监控系统:
import asyncio
import aiohttp
from prometheus_client import Counter, Histogram, start_http_server
# 定义指标
REQUEST_COUNT = Counter('api_requests_total', 'Total number of API requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('api_request_latency_seconds', 'API request latency in seconds', ['method', 'endpoint'])
RETRY_COUNT = Counter('api_retries_total', 'Total number of retries', ['endpoint'])
class MonitoredSession:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def request(self, method, url, **kwargs):
# 解析端点
endpoint = url.split('/')[-1]
# 记录开始时间
start_time = asyncio.get_event_loop().time()
try:
# 发送请求
response = await self.session.request(method, url, **kwargs)
# 更新指标
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=response.status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint).observe(
asyncio.get_event_loop().time() - start_time
)
return response
except Exception:
# 更新错误计数
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status="error").inc()
raise
def start_monitoring_server(port=8000):
# 启动Prometheus指标服务器
start_http_server(port)
print(f"Monitoring server started on port {port}")
# 在应用启动时启动监控服务器
start_monitoring_server()详细的日志记录对于问题排查和性能分析至关重要。以下是一个结构化日志记录的示例:
import logging
import json
from pythonjsonlogger import jsonlogger
# 配置结构化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 创建JSON格式的处理器
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s %(request_id)s %(user_id)s %(duration)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
async def process_with_logging(session, request_id, user_id, url):
start_time = asyncio.get_event_loop().time()
try:
logger.info(
"Starting request",
extra={
"request_id": request_id,
"user_id": user_id,
"url": url
}
)
async with session.get(url) as response:
content = await response.text()
duration = asyncio.get_event_loop().time() - start_time
logger.info(
"Request completed",
extra={
"request_id": request_id,
"user_id": user_id,
"status": response.status,
"duration": duration
}
)
return content
except Exception as e:
duration = asyncio.get_event_loop().time() - start_time
logger.error(
"Request failed",
extra={
"request_id": request_id,
"user_id": user_id,
"error": str(e),
"duration": duration
}
)
raise随着异步编程的普及,越来越多的工具和框架被开发出来,以简化异步API调用的实现和管理:
边缘计算将计算资源部署到离用户更近的位置,与异步处理结合可以进一步降低延迟:
机器学习和人工智能技术正在被应用到错误处理和重试策略中:
异步编程模式本身也在不断演进,变得更加易用和高效:
批量推理中的异步API调用是一种高效处理大规模请求的技术,通过非阻塞操作和并发处理能力,可以显著提高系统的吞吐量和资源利用率。本文详细介绍了异步编程的基础概念、实现技术、优化策略以及最佳实践,希望能够帮助读者更好地理解和应用异步API调用技术。
在实际应用中,需要根据具体的业务场景和系统需求,选择合适的技术栈和实现方案。异步API调用虽然可以带来显著的性能提升,但也引入了一定的复杂性,需要开发者具备良好的异步编程能力和错误处理意识。
随着技术的不断发展,异步编程模式和工具也在不断演进,为批量推理场景提供更高效、更可靠的解决方案。我们相信,异步API调用将在大模型部署和推理服务中发挥越来越重要的作用,为AI技术的广泛应用提供有力支持。