
在大语言模型(LLM)部署的最后一公里,API接口的设计与安全性直接决定了模型服务的可用性、稳定性与用户信任度。随着2025年LLM应用的爆炸式增长,如何构建高性能、高安全性的REST API成为开发者面临的核心挑战。FastAPI作为Python生态中最受青睐的Web框架之一,凭借其卓越的性能、强大的类型安全支持和完善的文档生成能力,已成为LLM服务化部署的首选方案。
根据Stack Overflow 2025年开发者调查数据,FastAPI的使用率较2024年增长了5个百分点,在高性能API开发领域占据主导地位。FastAPI的最新版本0.116.1(2025年7月11日发布)带来了对Starlette版本范围的升级和翻译支持优化,进一步提升了框架的稳定性和国际化能力。
本文将系统讲解如何使用FastAPI构建生产级LLM服务,重点关注REST端点的安全认证机制,涵盖JWT认证、OAuth2集成、API密钥管理等关键安全技术,并提供完整的代码实现和最佳实践指南。通过本文的学习,读者将能够构建满足企业级安全要求的LLM API服务。
FastAPI是一个基于Python类型提示的现代异步Web框架,其核心架构可以用公式表达:FastAPI = Starlette(异步) + Pydantic(类型) + OpenAPI(文档)。这一设计理念使其在性能、开发效率和文档质量方面同时具备优势。
FastAPI基于ASGI(异步服务器网关接口)标准构建,支持异步请求处理,能够处理每秒10^4级别的请求。在LLM服务场景中,这种高性能特性尤为重要,因为LLM推理通常是计算密集型任务,高效的API层可以最大化利用后端推理资源。
FastAPI通过Python的类型提示系统实现了自动的数据验证、序列化和文档生成,大大提升了开发效率。在LLM服务开发中,这意味着开发者可以将更多精力集中在模型集成和业务逻辑上,而不是繁琐的数据处理和文档维护工作。
FastAPI自动生成交互式API文档(Swagger UI和ReDoc),这对于LLM服务的测试和调试至关重要。开发者和用户可以直接在文档界面测试API接口,大大降低了集成难度。
构建企业级LLM API服务需要考虑多个层面的设计,包括前端接入、API网关、认证授权、模型推理和数据存储等。
用户请求 → API网关 → 认证授权层 → FastAPI应用 → 模型推理服务 → 响应返回对于大型LLM服务,可以考虑采用微服务架构,将不同功能模块拆分为独立服务:
一个典型的LLM API服务项目结构如下:
llm_api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI应用入口
│ ├── api/ # API路由模块
│ │ ├── __init__.py
│ │ ├── auth.py # 认证相关端点
│ │ ├── llm.py # LLM推理端点
│ │ └── health.py # 健康检查端点
│ ├── core/ # 核心配置模块
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ ├── security.py # 安全相关工具
│ │ └── logging.py # 日志配置
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── user.py # 用户模型
│ │ └── request.py # 请求模型
│ ├── schemas/ # Pydantic模型
│ │ ├── __init__.py
│ │ ├── user.py # 用户相关schema
│ │ └── llm.py # LLM相关schema
│ ├── services/ # 业务逻辑服务
│ │ ├── __init__.py
│ │ ├── auth_service.py # 认证服务
│ │ └── llm_service.py # LLM服务
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── validation.py # 验证工具
├── requirements.txt
├── .env.example
├── start.sh # 启动脚本
└── Dockerfile # Docker配置下面是创建一个基本FastAPI应用的代码示例:
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api import auth, llm, health
from app.core.config import settings
from app.core.logging import configure_logging
# 配置日志
configure_logging()
# 创建FastAPI应用实例
app = FastAPI(
title="LLM API Service",
description="高性能大语言模型服务API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(auth.router, prefix="/api/auth", tags=["authentication"])
app.include_router(llm.router, prefix="/api/llm", tags=["llm"])
app.include_router(health.router, prefix="/api/health", tags=["health"])
@app.get("/")
async def root():
return {"message": "LLM API Service is running"}
@app.get("/version")
async def version():
return {"version": "1.0.0"}使用Pydantic Settings管理应用配置,支持从环境变量读取配置项:
# app/core/config.py
from typing import List, Optional
from pydantic_settings import BaseSettings
from pydantic import validator
class Settings(BaseSettings):
PROJECT_NAME: str = "LLM API Service"
API_V1_STR: str = "/api/v1"
# 数据库配置
DATABASE_URL: str
# 安全配置
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS配置
BACKEND_CORS_ORIGINS: List[str] = []
# LLM服务配置
LLM_MODEL_NAME: str = "gpt2"
LLM_MAX_TOKENS: int = 1024
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(cls, v: str | List[str]) -> List[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()添加健康检查端点,用于监控API服务状态:
# app/api/health.py
from fastapi import APIRouter
from app.services.llm_service import LLMService
router = APIRouter()
llm_service = LLMService()
@router.get("/ping")
async def ping():
"""简单的健康检查端点"""
return {"status": "healthy"}
@router.get("/status")
async def status():
"""详细的服务状态检查"""
# 检查LLM服务状态
llm_status = await llm_service.health_check()
return {
"service": "LLM API",
"status": "running",
"llm_service": llm_status
}在LLM服务部署中,API安全至关重要。未受保护的LLM API可能导致以下风险:
设计企业级LLM API的安全认证体系应遵循以下原则:
实现多层次的安全防护,包括:
用户只能访问其完成任务所需的最小资源集。在LLM API中,可以根据用户角色限制其可使用的模型、推理参数或请求频率。
过于复杂的安全机制可能影响用户体验,需要在安全性和可用性之间找到平衡。例如,合理设置令牌过期时间,避免用户频繁重新登录。
所有API访问都应记录详细日志,包括用户身份、请求内容、访问时间等信息,便于安全审计和问题追溯。
认证方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
API Key | 实现简单,易于集成 | 安全性较低,容易泄露 | 内部系统集成,低频调用 |
JWT | 无状态,便于水平扩展 | 令牌无法主动撤销 | 普通用户认证,移动端应用 |
OAuth2 | 支持第三方授权,安全性高 | 实现复杂 | 需要第三方登录,企业SSO |
Session | 易于管理,支持主动失效 | 有状态,扩展困难 | 传统Web应用 |
HMAC | 高安全性,支持请求签名 | 实现复杂,客户端需要特殊处理 | 高安全要求的API,金融级应用 |
对于LLM API服务,推荐采用JWT作为主要认证方案,同时支持API Key用于系统集成场景。对于企业级应用,可以考虑集成OAuth2实现单点登录。
JSON Web Token (JWT) 是一种基于JSON的开放标准(RFC 7519),用于在各方之间安全地传输信息。JWT由三部分组成:
JWT的基本工作流程如下:
下面是在FastAPI中实现JWT认证的核心代码:
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any, Union, Optional
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
"""创建访问令牌"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""获取密码哈希值"""
return pwd_context.hash(password)# app/models/user.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())# app/schemas/user.py
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
class UserBase(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
is_active: Optional[bool] = True
is_superuser: Optional[bool] = False
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserLogin(BaseModel):
username: str
password: str
class User(UserBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None使用FastAPI的依赖注入系统实现JWT认证:
# app/api/deps.py
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.models.user import User
from app.schemas.user import TokenData
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
"""获取当前用户"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
async def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
"""获取当前活跃的超级用户"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user# app/api/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.core.security import verify_password, create_access_token
from app.models.user import User
from app.schemas.user import Token, UserCreate, User as UserSchema
router = APIRouter()
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
) -> Any:
"""用户登录"""
# 查找用户
user = db.query(User).filter(User.username == form_data.username).first()
# 验证用户和密码
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 检查用户是否激活
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# 创建访问令牌
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
subject=user.username, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=UserSchema)
async def register(
user_in: UserCreate,
db: Session = Depends(get_db)
) -> Any:
"""用户注册"""
# 检查用户名是否已存在
user = db.query(User).filter(User.username == user_in.username).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# 检查邮箱是否已存在
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# 创建新用户
from app.core.security import get_password_hash
db_user = User(
username=user_in.username,
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
is_active=user_in.is_active,
is_superuser=user_in.is_superuser
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user在实现JWT认证时,应遵循以下安全最佳实践:
JWT的一个缺点是一旦签发无法主动撤销,可通过以下方式实现伪撤销:
# 令牌撤销示例
import redis
from typing import Optional
from datetime import timedelta
from app.core.config import settings
# 初始化Redis连接
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
def revoke_token(token: str, expires_delta: Optional[timedelta] = None) -> None:
"""撤销令牌"""
if expires_delta:
redis_client.setex(f"revoked_token:{token}", expires_delta, "1")
else:
# 默认过期时间与访问令牌相同
redis_client.setex(
f"revoked_token:{token}",
timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
"1"
)
def is_token_revoked(token: str) -> bool:
"""检查令牌是否已撤销"""
return redis_client.exists(f"revoked_token:{token}") > 0避免在JWT中存储敏感信息:
OAuth 2.0是一个授权框架,允许第三方应用以安全的方式获取对用户资源的有限访问权限。在LLM API服务中,集成OAuth2可以支持第三方应用授权,提高系统的可扩展性和用户体验。
OAuth2支持多种授权流程,适用于不同场景:
最安全的OAuth2流程,适用于有后端服务的Web应用:
适用于服务器到服务器的通信场景,如系统集成:
使用FastAPI-OAuth2库可以简化OAuth2的集成:
# 安装依赖
# pip install fastapi-oauth2
# app/api/oauth2.py
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import RedirectResponse
from fastapi.security import OAuth2AuthorizationCodeBearer
from app.core.config import settings
from app.services.oauth_service import OAuthService
router = APIRouter()
oauth_service = OAuthService()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=f"{settings.OAUTH2_AUTHORIZATION_URL}",
tokenUrl=f"{settings.OAUTH2_TOKEN_URL}"
)
@router.get("/login/github")
async def login_github():
"""GitHub登录入口"""
redirect_uri = f"{settings.BASE_URL}/api/auth/callback/github"
url = oauth_service.get_github_authorization_url(redirect_uri)
return RedirectResponse(url)
@router.get("/callback/github")
async def callback_github(
request: Request,
code: str
):
"""GitHub回调处理"""
try:
# 换取访问令牌
token = await oauth_service.github_exchange_code(code)
# 获取用户信息
user_info = await oauth_service.get_github_user_info(token)
# 创建或更新用户
db_user = await oauth_service.create_or_update_user(
provider="github",
provider_user_id=user_info["id"],
username=user_info["login"],
email=user_info.get("email"),
avatar_url=user_info.get("avatar_url")
)
# 创建JWT令牌
access_token = await oauth_service.create_user_token(db_user)
# 重定向到前端,携带令牌
return RedirectResponse(
url=f"{settings.FRONTEND_URL}/auth/callback?token={access_token}"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)# app/services/oauth_service.py
from typing import Dict, Any, Optional
from datetime import timedelta
import httpx
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.config import settings
from app.core.security import create_access_token
from app.models.user import User
from app.models.oauth_account import OAuthAccount
class OAuthService:
def __init__(self):
self.db: Session = next(get_db())
def get_github_authorization_url(self, redirect_uri: str) -> str:
"""获取GitHub授权URL"""
return (
f"https://github.com/login/oauth/authorize?"
f"client_id={settings.GITHUB_CLIENT_ID}&"
f"redirect_uri={redirect_uri}&"
f"scope=user:email"
)
async def github_exchange_code(self, code: str) -> str:
"""使用授权码换取GitHub访问令牌"""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://github.com/login/oauth/access_token",
headers={"Accept": "application/json"},
data={
"client_id": settings.GITHUB_CLIENT_ID,
"client_secret": settings.GITHUB_CLIENT_SECRET,
"code": code,
"redirect_uri": f"{settings.BASE_URL}/api/auth/callback/github"
}
)
response_data = response.json()
if "error" in response_data:
raise Exception(response_data["error_description"])
return response_data["access_token"]
async def get_github_user_info(self, access_token: str) -> Dict[str, Any]:
"""获取GitHub用户信息"""
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.github.com/user",
headers={"Authorization": f"token {access_token}"}
)
return response.json()
def create_or_update_user(
self,
provider: str,
provider_user_id: str,
username: str,
email: Optional[str] = None,
avatar_url: Optional[str] = None
) -> User:
"""创建或更新用户"""
# 查找OAuth账户
oauth_account = self.db.query(OAuthAccount).filter(
OAuthAccount.provider == provider,
OAuthAccount.provider_user_id == provider_user_id
).first()
if oauth_account:
# 更新现有用户
user = oauth_account.user
if email:
user.email = email
if avatar_url:
user.avatar_url = avatar_url
else:
# 查找或创建新用户
user = self.db.query(User).filter(
User.username == username
).first()
if not user:
# 创建新用户
user = User(
username=username,
email=email,
avatar_url=avatar_url,
is_active=True
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
# 创建OAuth账户关联
oauth_account = OAuthAccount(
provider=provider,
provider_user_id=provider_user_id,
user_id=user.id
)
self.db.add(oauth_account)
self.db.commit()
return user
def create_user_token(self, user: User) -> str:
"""为用户创建访问令牌"""
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return create_access_token(
subject=user.username, expires_delta=access_token_expires
)# app/models/oauth_account.py
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from app.core.database import Base
class OAuthAccount(Base):
__tablename__ = "oauth_accounts"
id = Column(Integer, primary_key=True, index=True)
provider = Column(String(50), nullable=False) # e.g., "github"
provider_user_id = Column(String(100), nullable=False) # 用户在提供商处的ID
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# 关联关系
user = relationship("User", back_populates="oauth_accounts")
# 更新User模型
# from sqlalchemy.orm import relationship
#
# class User(Base):
# # ... 现有字段 ...
# avatar_url = Column(String(255), nullable=True)
# oauth_accounts = relationship("OAuthAccount", back_populates="user")在集成OAuth2时,应遵循以下安全最佳实践:
# CSRF保护示例
def generate_state_parameter() -> str:
"""生成CSRF状态参数"""
return secrets.token_urlsafe(32)
def store_state_parameter(session_id: str, state: str) -> None:
"""存储状态参数"""
redis_client.setex(
f"oauth_state:{session_id}",
timedelta(minutes=10),
state
)
def validate_state_parameter(session_id: str, state: str) -> bool:
"""验证状态参数"""
stored_state = redis_client.get(f"oauth_state:{session_id}")
if stored_state and stored_state == state:
# 验证后删除,防止重用
redis_client.delete(f"oauth_state:{session_id}")
return True
return FalseAPI密钥认证是一种简单直观的认证方式,适用于系统集成场景。每个集成方会被分配一个唯一的API密钥,用于标识和认证请求。
# app/models/api_key.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.core.database import Base
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(100), unique=True, index=True, nullable=False)
name = Column(String(100), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
expires_at = Column(DateTime(timezone=True), nullable=True)
last_used_at = Column(DateTime(timezone=True), nullable=True)
# 使用次数统计
usage_count = Column(Integer, default=0)
# 关联关系
user = relationship("User", back_populates="api_keys")
# 更新User模型
# class User(Base):
# # ... 现有字段 ...
# api_keys = relationship("APIKey", back_populates="user")# app/core/api_key.py
import secrets
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.api_key import APIKey
def generate_api_key() -> str:
"""生成安全的API密钥"""
# 生成64字符的随机密钥
return secrets.token_urlsafe(48)
def create_api_key(
db: Session,
user_id: int,
name: str,
expires_in: Optional[int] = None # 有效期(天)
) -> APIKey:
"""创建新的API密钥"""
key = generate_api_key()
expires_at = None
if expires_in:
expires_at = datetime.utcnow() + timedelta(days=expires_in)
api_key = APIKey(
key=key,
name=name,
user_id=user_id,
expires_at=expires_at
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return api_key
def verify_api_key(db: Session, api_key: str) -> Optional[APIKey]:
"""验证API密钥"""
# 查找API密钥
key = db.query(APIKey).filter(APIKey.key == api_key).first()
if not key:
return None
# 检查是否激活
if not key.is_active:
return None
# 检查是否过期
if key.expires_at and datetime.utcnow() > key.expires_at:
return None
# 更新使用信息
key.last_used_at = datetime.utcnow()
key.usage_count += 1
db.commit()
return key# app/api/deps.py
from typing import Optional
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import APIKeyHeader
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.api_key import APIKey
from app.core.api_key import verify_api_key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_current_api_key(
api_key: Optional[str] = Security(api_key_header),
db: Session = Depends(get_db)
) -> APIKey:
"""获取当前API密钥"""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required",
headers={"X-API-Key-Error": "missing"},
)
key = verify_api_key(db, api_key)
if not key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired API key",
headers={"X-API-Key-Error": "invalid"},
)
return key
async def get_current_user_from_api_key(
api_key: APIKey = Depends(get_current_api_key)
):
"""从API密钥获取当前用户"""
return api_key.user限流是API服务保护的重要机制,可以防止滥用和DoS攻击。FastAPI中可以使用slowapi库实现限流:
# 安装依赖
# pip install slowapi
# app/core/rate_limiter.py
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# 基于IP的限流器
limiter = Limiter(key_func=get_remote_address)# app/main.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 创建限流器
limiter = Limiter(key_func=get_remote_address)
# 创建FastAPI应用
app = FastAPI(...)
# 添加限流中间件
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# app/api/llm.py
from fastapi import APIRouter, Depends, HTTPException, status
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.api.deps import get_current_user, get_current_api_key
from app.schemas.llm import LLMRequest, LLMResponse
from app.services.llm_service import LLMService
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
llm_service = LLMService()
# 基于用户的LLM推理端点
@router.post("/generate", response_model=LLMResponse)
@limiter.limit("10/minute") # 限制每分钟10次请求
async def generate_text(
request_data: LLMRequest,
current_user = Depends(get_current_user)
):
"""文本生成端点"""
try:
# 记录用户信息
request_data.user_id = current_user.id
# 调用LLM服务
result = await llm_service.generate_text(request_data)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
# 基于API密钥的LLM推理端点
@router.post("/generate/api-key", response_model=LLMResponse)
@limiter.limit("100/minute") # 为API密钥设置不同的限流
async def generate_text_api_key(
request_data: LLMRequest,
api_key = Depends(get_current_api_key)
):
"""基于API密钥的文本生成端点"""
try:
# 记录API密钥信息
request_data.api_key_id = api_key.id
request_data.user_id = api_key.user_id
# 调用LLM服务
result = await llm_service.generate_text(request_data)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)对于企业级应用,可以实现更复杂的限流策略:
# 分布式限流示例
import redis
import time
from typing import Optional
from app.core.config import settings
class DistributedRateLimiter:
def __init__(self):
self.redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
def is_allowed(
self,
key: str,
limit: int,
period: int
) -> bool:
"""检查请求是否允许
Args:
key: 限流键(如用户ID或API密钥)
limit: 时间窗口内的最大请求数
period: 时间窗口(秒)
Returns:
bool: 是否允许请求
"""
current_time = int(time.time())
window_start = current_time - period
# 使用管道减少Redis调用次数
pipe = self.redis_client.pipeline()
# 生成键名
rate_key = f"rate_limit:{key}"
# 删除过期的请求记录
pipe.zremrangebyscore(rate_key, 0, window_start)
# 获取当前窗口内的请求数
pipe.zcard(rate_key)
# 添加当前请求
pipe.zadd(rate_key, {str(current_time): current_time})
# 设置键过期时间
pipe.expire(rate_key, period)
# 执行所有命令
_, current_count, _, _ = pipe.execute()
# 检查是否超过限制
return current_count < limit在LLM API中,输入验证尤为重要,因为:
FastAPI通过Pydantic提供了强大的输入验证功能。
# app/schemas/llm.py
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, validator, root_validator
class LLMRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=10000)
model: str = Field(default="gpt2", min_length=1, max_length=50)
max_tokens: int = Field(default=100, ge=1, le=4096)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
top_p: float = Field(default=1.0, ge=0.0, le=1.0)
n: int = Field(default=1, ge=1, le=5)
stop: Optional[List[str]] = None
# 内部字段,不通过API接收
user_id: Optional[int] = None
api_key_id: Optional[int] = None
@validator("stop")
def validate_stop(cls, v):
if v and len(v) > 4:
raise ValueError("stop参数最多支持4个字符串")
return v
@root_validator
def validate_parameters(cls, values):
temperature = values.get("temperature")
top_p = values.get("top_p")
# 避免同时使用temperature和top_p
if temperature < 1.0 and top_p < 1.0:
raise ValueError("temperature和top_p不应同时小于1.0")
return values
class LLMResponse(BaseModel):
id: str
object: str = "text_completion"
created: int
model: str
choices: List[Dict[str, Any]]
usage: Dict[str, int]实现提示词和生成内容的安全过滤,防止有害内容的生成和传播:
# app/services/content_filter.py
from typing import Dict, Any, Tuple
import re
from app.core.config import settings
class ContentFilter:
def __init__(self):
# 初始化安全过滤器
self.initialize_filters()
def initialize_filters(self):
"""初始化内容过滤器"""
# 示例:加载敏感词列表
self.sensitive_patterns = [
# 这里可以加载敏感词模式
]
def check_prompt(self, prompt: str) -> Tuple[bool, str]:
"""检查提示词是否安全
Args:
prompt: 用户输入的提示词
Returns:
Tuple[bool, str]: (是否安全, 错误信息)
"""
# 检查长度
if len(prompt) > settings.MAX_PROMPT_LENGTH:
return False, f"提示词长度超过限制({settings.MAX_PROMPT_LENGTH}字符)"
# 检查敏感内容
for pattern in self.sensitive_patterns:
if re.search(pattern, prompt, re.IGNORECASE):
return False, "提示词包含不适当内容"
# 检查是否存在提示注入攻击
if self._detect_prompt_injection(prompt):
return False, "检测到潜在的提示注入攻击"
return True, ""
def check_generated_content(self, content: str) -> Tuple[bool, str]:
"""检查生成内容是否安全
Args:
content: 模型生成的内容
Returns:
Tuple[bool, str]: (是否安全, 错误信息)
"""
# 检查敏感内容
for pattern in self.sensitive_patterns:
if re.search(pattern, content, re.IGNORECASE):
return False, "生成内容包含不适当内容"
# 检查是否存在有害指导
if self._detect_harmful_instructions(content):
return False, "生成内容包含有害指导"
return True, ""
def _detect_prompt_injection(self, text: str) -> bool:
"""检测提示注入攻击"""
# 示例检测规则
injection_patterns = [
r"ignore previous",
r"forget previous",
r"system:|
system:|
system\n",
r"you are not",
r"override instructions",
]
for pattern in injection_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def _detect_harmful_instructions(self, text: str) -> bool:
"""检测有害指导内容"""
# 示例检测规则
harmful_patterns = [
r"how to create a bomb",
r"how to hack",
r"how to steal",
]
for pattern in harmful_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False# app/services/llm_service.py
from typing import Dict, Any, Optional
import time
import uuid
from app.schemas.llm import LLMRequest, LLMResponse
from app.services.content_filter import ContentFilter
class LLMService:
def __init__(self):
self.content_filter = ContentFilter()
# 初始化LLM模型
self.initialize_model()
def initialize_model(self):
"""初始化LLM模型"""
# 这里可以加载预训练模型
# 示例:使用Hugging Face Transformers
from transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
self.model = AutoModelForCausalLM.from_pretrained("gpt2")
async def generate_text(self, request: LLMRequest) -> LLMResponse:
"""生成文本"""
# 内容安全检查
is_safe, error_msg = self.content_filter.check_prompt(request.prompt)
if not is_safe:
raise ValueError(error_msg)
# 处理生成请求
try:
# 记录开始时间
start_time = time.time()
# 准备生成参数
generate_params = {
"max_new_tokens": request.max_tokens,
"temperature": request.temperature,
"top_p": request.top_p,
"num_return_sequences": request.n,
"stop": request.stop,
"pad_token_id": self.tokenizer.eos_token_id
}
# 进行文本生成
inputs = self.tokenizer(request.prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, **generate_params)
# 处理生成结果
choices = []
for i in range(len(outputs)):
# 解码生成的文本
generated_text = self.tokenizer.decode(outputs[i], skip_special_tokens=True)
# 检查生成内容是否安全
is_safe, error_msg = self.content_filter.check_generated_content(generated_text)
if not is_safe:
generated_text = "[内容过滤] 生成内容包含不适当信息"
choices.append({
"text": generated_text,
"index": i,
"logprobs": None,
"finish_reason": "stop"
})
# 计算使用的tokens
prompt_tokens = len(inputs["input_ids"][0])
completion_tokens = sum(len(output) - prompt_tokens for output in outputs)
total_tokens = prompt_tokens + completion_tokens
# 构建响应
response = LLMResponse(
id=str(uuid.uuid4()),
object="text_completion",
created=int(time.time()),
model=request.model,
choices=choices,
usage={
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
)
# 记录使用情况
await self._record_usage(request, response)
return response
except Exception as e:
# 记录错误
await self._record_error(request, str(e))
raise
async def _record_usage(self, request: LLMRequest, response: LLMResponse):
"""记录API使用情况"""
# 这里可以实现使用情况记录逻辑
pass
async def _record_error(self, request: LLMRequest, error: str):
"""记录错误"""
# 这里可以实现错误记录逻辑
pass
async def health_check(self) -> Dict[str, Any]:
"""健康检查"""
try:
# 简单的模型推理测试
test_prompt = "Hello, world!"
inputs = self.tokenizer(test_prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, max_new_tokens=5)
return {
"status": "healthy",
"model": "gpt2",
"response_time": "fast"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}除了输入验证外,还可以采取以下措施加固端点安全:
实现统一的错误处理机制,避免暴露敏感信息:
# app/core/exceptions.py
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP异常处理"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.status_code,
"message": exc.detail
}
},
)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
# 记录详细错误
print(f"Unhandled exception: {exc}")
# 返回通用错误信息
return JSONResponse(
status_code=500,
content={
"error": {
"code": 500,
"message": "服务器内部错误,请稍后重试"
}
},
)
# 在main.py中注册
# app.add_exception_handler(HTTPException, http_exception_handler)
# app.add_exception_handler(StarletteHTTPException, http_exception_handler)
# app.add_exception_handler(Exception, general_exception_handler)确保所有API通信都通过HTTPS进行:
# app/main.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPRedirectResponse
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
# 在生产环境中启用HTTPS重定向
if settings.ENVIRONMENT == "production":
app.add_middleware(HTTPSRedirectMiddleware)
@app.middleware("http")
async def ensure_https(request, call_next):
"""确保请求使用HTTPS"""
if settings.ENVIRONMENT == "production" and request.url.scheme != "https":
# 对于生产环境,重定向HTTP到HTTPS
url = request.url.replace(scheme="https")
return HTTPRedirectResponse(url)
response = await call_next(request)
return response添加安全相关的响应头:
# app/main.py
from fastapi import FastAPI
from starlette.middleware import Middleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.gzip import GZipMiddleware
# 添加安全中间件
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=settings.ALLOWED_HOSTS,
)
# 添加压缩中间件
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.middleware("http")
async def add_security_headers(request, call_next):
"""添加安全响应头"""
response = await call_next(request)
# 添加安全头
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response在LLM API服务中,监控系统扮演着至关重要的角色:
配置结构化日志,便于分析和查询:
# app/core/logging.py
import logging
import sys
from typing import Union
from loguru import logger
from app.core.config import settings
class InterceptHandler(logging.Handler):
"""拦截标准库日志到Loguru"""
def emit(self, record: logging.LogRecord) -> None:
# 获取对应Loguru级别
level: Union[str, int] = record.levelno
if level >= logging.CRITICAL:
level = "CRITICAL"
elif level >= logging.ERROR:
level = "ERROR"
elif level >= logging.WARNING:
level = "WARNING"
elif level >= logging.INFO:
level = "INFO"
elif level >= logging.DEBUG:
level = "DEBUG"
else:
level = "TRACE"
# 获取调用上下文
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
# 记录日志
logger.opt(depth=depth, exception=record.exc_info).log(
level,
record.getMessage(),
)
def configure_logging() -> None:
"""配置日志系统"""
# 移除默认处理器
logger.remove()
# 添加控制台输出
console_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level>"
)
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format=console_format,
colorize=True,
)
# 添加文件输出
file_format = (
"{time:YYYY-MM-DD HH:mm:ss.SSS} | "
"{level: <8} | "
"{name}:{function}:{line} | "
"{message}"
)
logger.add(
settings.LOG_FILE,
level="DEBUG",
format=file_format,
rotation="1 day",
retention="7 days",
compression="zip",
)
# 添加结构化日志文件(用于分析)
logger.add(
settings.JSON_LOG_FILE,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}",
serialize=True,
rotation="1 day",
retention="30 days",
)
# 拦截标准库日志
logging.basicConfig(handlers=[InterceptHandler()], level=0)
# 拦截第三方库日志
for logger_name in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
uvicorn_logger = logging.getLogger(logger_name)
uvicorn_logger.handlers = [InterceptHandler()]
uvicorn_logger.setLevel(logging.INFO)记录所有API请求的详细信息:
# app/core/middleware.py
import time
from typing import Callable
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import Response
from loguru import logger
class RequestLoggerMiddleware(BaseHTTPMiddleware):
"""请求日志中间件"""
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
# 记录请求开始
start_time = time.time()
path = request.url.path
method = request.method
client_ip = request.client.host
# 记录请求信息(排除敏感信息)
logger.info(
"Request received",
extra={
"method": method,
"path": path,
"client_ip": client_ip,
},
)
try:
# 处理请求
response = await call_next(request)
# 计算响应时间
process_time = time.time() - start_time
# 记录响应信息
logger.info(
"Request completed",
extra={
"method": method,
"path": path,
"status_code": response.status_code,
"process_time": process_time,
},
)
# 添加处理时间头
response.headers["X-Process-Time"] = str(process_time)
return response
except Exception as e:
# 记录异常
process_time = time.time() - start_time
logger.error(
"Request error",
extra={
"method": method,
"path": path,
"error": str(e),
"process_time": process_time,
},
exc_info=True,
)
raise实现审计日志,记录所有关键操作:
# app/services/audit_service.py
from datetime import datetime
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.audit_log import AuditLog
class AuditService:
def __init__(self):
self.db: Session = next(get_db())
def log_action(
self,
action_type: str,
user_id: Optional[int] = None,
api_key_id: Optional[int] = None,
resource_type: Optional[str] = None,
resource_id: Optional[int] = None,
details: Optional[Dict[str, Any]] = None,
success: bool = True,
) -> AuditLog:
"""记录审计操作
Args:
action_type: 操作类型(如login, create_api_key, llm_generate等)
user_id: 用户ID
api_key_id: API密钥ID
resource_type: 资源类型
resource_id: 资源ID
details: 详细信息
success: 操作是否成功
Returns:
AuditLog: 审计日志记录
"""
log_entry = AuditLog(
action_type=action_type,
user_id=user_id,
api_key_id=api_key_id,
resource_type=resource_type,
resource_id=resource_id,
details=details,
success=success,
timestamp=datetime.utcnow(),
)
self.db.add(log_entry)
self.db.commit()
self.db.refresh(log_entry)
return log_entry
def get_user_audit_logs(
self,
user_id: int,
limit: int = 100,
offset: int = 0
) -> list:
"""获取用户的审计日志"""
return (
self.db.query(AuditLog)
.filter(AuditLog.user_id == user_id)
.order_by(AuditLog.timestamp.desc())
.limit(limit)
.offset(offset)
.all()
)
def get_api_key_audit_logs(
self,
api_key_id: int,
limit: int = 100,
offset: int = 0
) -> list:
"""获取API密钥的审计日志"""
return (
self.db.query(AuditLog)
.filter(AuditLog.api_key_id == api_key_id)
.order_by(AuditLog.timestamp.desc())
.limit(limit)
.offset(offset)
.all()
)使用Prometheus客户端收集关键监控指标:
# 安装依赖
# pip install prometheus_client
# app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
from fastapi import FastAPI
# 定义指标
REQUEST_COUNT = Counter(
"llm_api_request_count",
"Total number of API requests",
["endpoint", "method", "status_code"]
)
REQUEST_LATENCY = Histogram(
"llm_api_request_latency_seconds",
"API request latency in seconds",
["endpoint"]
)
LLM_TOKEN_COUNT = Counter(
"llm_token_count",
"Total number of tokens processed",
["type", "model"] # type: prompt, completion
)
ACTIVE_REQUESTS = Gauge(
"llm_api_active_requests",
"Number of active requests being processed"
)
ERROR_COUNT = Counter(
"llm_api_error_count",
"Number of API errors",
["endpoint", "error_type"]
)
def setup_metrics(app: FastAPI):
"""在FastAPI应用中设置指标收集"""
from fastapi import Request
from prometheus_client import make_asgi_app
import time
# 创建Prometheus ASGI应用
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""收集请求指标的中间件"""
# 增加活跃请求数
ACTIVE_REQUESTS.inc()
# 记录开始时间
start_time = time.time()
# 处理请求
try:
response = await call_next(request)
# 计算延迟
latency = time.time() - start_time
# 记录指标
endpoint = request.url.path
REQUEST_COUNT.labels(
endpoint=endpoint,
method=request.method,
status_code=response.status_code
).inc()
REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)
return response
except Exception as e:
# 记录错误指标
ERROR_COUNT.labels(
endpoint=request.url.path,
error_type=type(e).__name__
).inc()
raise
finally:
# 减少活跃请求数
ACTIVE_REQUESTS.dec()
# 在main.py中设置
# from app.core.metrics import setup_metrics
# setup_metrics(app)
## 第八章 部署优化与容器化实践
### 8.1 容器化部署概述
容器化部署已成为2025年企业级应用部署的标准方式,对于LLM API服务尤为重要。容器化提供了环境一致性、快速部署、资源隔离等优势,使得LLM服务的部署和管理更加高效。
在LLM服务场景中,容器化部署可以解决以下关键问题:
- **环境一致性**:确保开发、测试和生产环境完全一致
- **资源管理**:精确控制CPU、内存等资源分配
- **快速扩展**:根据流量自动扩缩容
- **简化部署流程**:CI/CD集成更加便捷
### 8.2 Docker配置与优化
#### 8.2.1 基础Dockerfile
```dockerfile
# 基础镜像选择Python 3.11
FROM python:3.11-slim-bullseye
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
POETRY_VERSION=1.8.3
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libc6-dev \
&& rm -rf /var/lib/apt/lists/*
# 安装Poetry
RUN pip install --upgrade pip && \
pip install "poetry==$POETRY_VERSION"
# 配置Poetry
RUN poetry config virtualenvs.create false
# 复制pyproject.toml和poetry.lock
COPY pyproject.toml poetry.lock* ./
# 安装依赖
RUN poetry install --no-root --no-dev
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 运行应用
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]为了减小镜像体积并提高安全性,推荐使用多阶段构建:
# 第一阶段:构建环境
FROM python:3.11-slim-bullseye AS builder
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
POETRY_VERSION=1.8.3
# 安装系统构建依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libc6-dev \
&& rm -rf /var/lib/apt/lists/*
# 安装Poetry
RUN pip install --upgrade pip && \
pip install "poetry==$POETRY_VERSION"
# 配置Poetry
RUN poetry config virtualenvs.create false
# 复制pyproject.toml和poetry.lock
COPY pyproject.toml poetry.lock* ./
# 安装依赖(包括开发依赖)
RUN poetry install
# 复制应用代码
COPY . .
# 运行测试
RUN poetry run pytest
# 第二阶段:运行环境
FROM python:3.11-slim-bullseye
WORKDIR /app
# 从构建阶段复制安装的依赖
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m appuser
USER appuser
# 暴露端口
EXPOSE 8000
# 运行应用(使用gunicorn代替uvicorn用于生产环境)
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]使用Docker Compose管理多容器应用,包括LLM API服务、数据库等:
version: '3.9'
services:
api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://admin:password@db:5432/example_db
- SECRET_KEY=${SECRET_KEY}
- LLM_MODEL_NAME=gpt2
depends_on:
- db
restart: always
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/health/liveness"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '0.5'
memory: 1G
db:
image: postgres:15-alpine
environment:
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=password
- POSTGRES_DB=example_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
restart: always
healthcheck:
test: ["CMD-SHELL", "pg_isready -U admin -d example_db"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:# requirements.txt 中添加
gunicorn==21.2.0
uvicorn[standard]==0.25.0启动脚本:
在生产环境中,为LLM API服务配置负载均衡器是实现高可用的关键步骤:
# haproxy.cfg 示例
global
log /dev/log local0
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
stats timeout 30s
user haproxy
group haproxy
daemon
maxconn 20000
defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000
errorfile 400 /etc/haproxy/errors/400.http
errorfile 403 /etc/haproxy/errors/403.http
errorfile 408 /etc/haproxy/errors/408.http
errorfile 500 /etc/haproxy/errors/500.http
errorfile 502 /etc/haproxy/errors/502.http
errorfile 503 /etc/haproxy/errors/503.http
errorfile 504 /etc/haproxy/errors/504.http
frontend llm_api_frontend
bind *:80
bind *:443 ssl crt /etc/ssl/certs/llm-api.pem
mode http
option forwardfor
http-request add-header X-Forwarded-Proto https if { ssl_fc }
default_backend llm_api_backend
backend llm_api_backend
mode http
balance roundrobin
option httpchk GET /api/health/liveness
http-check expect status 200
default-server inter 3s fall 3 rise 2 maxconn 500 timeout connect 2s timeout server 30s
server api1 api-server1:8000
server api2 api-server2:8000
server api3 api-server3:8000
listen stats
bind *:9000
mode http
stats enable
stats uri /stats
stats refresh 10s使用Certbot自动管理SSL证书:
# 安装Certbot
sudo apt-get update
sudo apt-get install certbot python3-certbot-nginx -y
# 为LLM API服务获取证书
sudo certbot --nginx -d llm-api.example.com配置Nginx反向代理:
# /etc/nginx/sites-available/llm-api
server {
listen 80;
server_name llm-api.example.com;
return 301 https://$host$request_uri;
}
server {
listen 443 ssl;
server_name llm-api.example.com;
ssl_certificate /etc/letsencrypt/live/llm-api.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/llm-api.example.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# 安全头部
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header Content-Security-Policy "default-src 'self'" always;
add_header X-Content-Type-Options nosniff;
add_header X-Frame-Options SAMEORIGIN;
add_header X-XSS-Protection "1; mode=block";
# 代理配置
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 针对LLM API长请求的超时设置
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
}
# 健康检查端点
location /api/health/liveness {
proxy_pass http://localhost:8000/api/health/liveness;
access_log off;
allow 127.0.0.1;
deny all;
}
}使用GitHub Actions配置CI/CD流水线,实现自动测试、构建和部署:
# .github/workflows/deploy.yml
name: Deploy LLM API
on:
push:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pytest-cov httpx
pip install -e .
- name: Run tests
run: |
pytest --cov=app --cov-report=xml
build-and-push:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: llm-api:latest,llm-api:${{ github.sha }}
cache-from: type=registry,ref=llm-api:latest
cache-to: type=inline
deploy:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- name: Deploy to production
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USERNAME }}
key: ${{ secrets.SSH_PRIVATE_KEY }}
script: |
cd /path/to/llm-api
docker-compose pull
docker-compose up -d
docker image prune -f对于需要更高扩展性和可靠性的LLM服务,可以考虑使用Kubernetes进行部署:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-api
spec:
replicas: 3
selector:
matchLabels:
app: llm-api
template:
metadata:
labels:
app: llm-api
spec:
containers:
- name: llm-api
image: llm-api:latest
ports:
- containerPort: 8000
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secrets
key: url
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: secret-key
- name: LLM_MODEL_NAME
value: "gpt2"
readinessProbe:
httpGet:
path: /api/health/readiness
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /api/health/liveness
port: 8000
initialDelaySeconds: 15
periodSeconds: 20
startupProbe:
httpGet:
path: /api/health/startup
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 30
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-api-service
spec:
selector:
app: llm-api
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: llm-api-ingress
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "true"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
nginx.ingress.kubernetes.io/proxy-body-size: "50m"
spec:
tls:
- hosts:
- llm-api.example.com
secretName: llm-api-tls
rules:
- host: llm-api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: llm-api-service
port:
number: 80
#### 8.5.2 Kubernetes最佳实践
在Kubernetes环境中部署LLM API服务时,需要特别注意以下几点最佳实践:
1. **资源管理优化**
- 为容器设置合理的资源请求和限制
- 使用资源配额(ResourceQuota)限制命名空间资源使用
- 考虑节点亲和性确保关键工作负载运行在合适的节点上
2. **状态管理**
- 使用StatefulSet管理有状态的LLM服务组件
- 利用PersistentVolumeClaim管理模型存储
- 考虑使用分布式缓存(如Redis)提高响应性能
3. **服务网格集成**
- 使用Istio管理服务间通信、流量控制和安全
- 配置断路器避免级联故障
- 实现精细的流量分割和金丝雀发布
### 8.6 服务网格与Istio配置
服务网格(Service Mesh)是2025年企业级应用架构的重要组成部分,尤其对于LLM API服务这样的微服务架构:
```yaml
# istio-gateway.yaml
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: llm-api-gateway
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "llm-api.example.com"
tls:
httpsRedirect: true
- port:
number: 443
name: https
protocol: HTTPS
hosts:
- "llm-api.example.com"
tls:
mode: SIMPLE
credentialName: llm-api-tls
---
# istio-virtualservice.yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: llm-api-virtualservice
spec:
hosts:
- "llm-api.example.com"
gateways:
- llm-api-gateway
http:
- match:
- uri:
prefix: /api/health/
route:
- destination:
host: llm-api-service
port:
number: 80
timeout: 10s
- match:
- uri:
prefix: /api/v1/
route:
- destination:
host: llm-api-service
port:
number: 80
timeout: 300s # LLM推理可能需要较长时间
retries:
attempts: 3
perTryTimeout: 300s
retryOn: connect-failure,refused-stream,unavailable,cancelled
---
# istio-destinationrule.yaml
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: llm-api-destinationrule
spec:
host: llm-api-service
trafficPolicy:
connectionPool:
http:
http1MaxPendingRequests: 100
maxRequestsPerConnection: 10
maxRetries: 3
tcp:
maxConnections: 1000
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50在部署和运行LLM API服务过程中,经常会遇到各种错误和问题。以下是2025年LLM API服务常见的错误类型和排查方法:
症状:客户端无法连接到API服务
排查步骤:
systemctl status llm-api 或 docker psnetstat -tlnp | grep 8000iptables -L 或云服务商的安全组配置journalctl -u llm-api 或 docker logs llm-api常见解决方案:
症状:API返回401 Unauthorized错误
排查步骤:
常见解决方案:
症状:API返回500 Internal Server Error,与LLM模型加载或推理相关
排查步骤:
nvidia-smi 检查GPU状态free -h 或 kubectl top pods常见解决方案:
使用Prometheus和Grafana监控LLM API的性能指标:
# app/core/performance.py
from prometheus_client import Histogram, Counter, Gauge
import time
from functools import wraps
# 请求延迟指标
REQUEST_LATENCY = Histogram(
'llm_api_request_latency_seconds',
'Request latency in seconds',
['endpoint', 'method']
)
# 模型推理延迟指标
MODEL_INFERENCE_LATENCY = Histogram(
'llm_model_inference_latency_seconds',
'Model inference latency in seconds',
['model_name']
)
# 令牌处理计数
TOKEN_PROCESSING = Counter(
'llm_token_processing_total',
'Token processing count',
['operation', 'model_name']
)
# 内存使用指标
MEMORY_USAGE = Gauge(
'llm_api_memory_usage_bytes',
'Memory usage in bytes'
)
# 性能分析装饰器
def measure_performance(endpoint_name):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
response = await func(*args, **kwargs)
end_time = time.time()
REQUEST_LATENCY.labels(
endpoint=endpoint_name,
method=kwargs.get('request', {}).method
).observe(end_time - start_time)
return response
return wrapper
return decoratorGrafana监控面板: 创建关键性能指标监控面板,包括:
使用内存分析工具检测和修复内存泄漏:
# 安装内存分析工具
# pip install memory-profiler psutil
# memory_profile.py
from memory_profiler import profile
@profile
def analyze_llm_response(prompt, model):
# LLM推理逻辑
response = model.generate(prompt)
return response
# 在应用中集成
# from app.core.memory_profile import analyze_llm_response
# response = analyze_llm_response(prompt, model)Docker内存限制优化:
# docker-compose.yml
version: '3.9'
services:
api:
# ...
deploy:
resources:
limits:
cpus: '2'
memory: 3G
reservations:
cpus: '0.5'
memory: 1G
# 设置内存软限制警告
environment:
- MEMORY_LIMIT_WARNING=2.5G# app/core/database.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import os
DATABASE_URL = os.getenv("DATABASE_URL")
# 优化的数据库连接池配置
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接数
pool_timeout=30, # 连接获取超时时间
pool_recycle=1800, # 连接回收时间(秒)
pool_pre_ping=True, # 连接有效性检查
echo=False # 关闭SQL日志
)
# 连接池监控指标
from prometheus_client import Gauge
DB_CONNECTION_POOL_SIZE = Gauge(
'llm_api_db_connection_pool_size',
'Database connection pool size'
)
DB_CONNECTION_POOL_USED = Gauge(
'llm_api_db_connection_pool_used',
'Database connection pool used count'
)
# 定期更新连接池指标
import threading
def update_db_pool_metrics():
while True:
DB_CONNECTION_POOL_SIZE.set(engine.pool.size())
DB_CONNECTION_POOL_USED.set(engine.pool.checkedin())
time.sleep(10)
# 启动监控线程
threading.Thread(target=update_db_pool_metrics, daemon=True).start()症状:数据库连接错误或查询超时
排查步骤:
systemctl status postgresql 或 docker ps | grep postgres常见解决方案:
# 数据库请求重试装饰器
from functools import wraps
import time
from sqlalchemy.exc import SQLAlchemyError
def retry_db_operation(max_retries=3, delay=1):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return await func(*args, **kwargs)
except SQLAlchemyError as e:
retries += 1
if retries >= max_retries:
raise
print(f"Database operation failed, retrying ({retries}/{max_retries})...")
time.sleep(delay * retries) # 指数退避
return await func(*args, **kwargs)
return wrapper
return decorator2025年的最佳实践是定期进行安全审计和漏洞扫描:
# 使用依赖检查工具扫描已知漏洞
pip install safety
safety check
# 使用bandit扫描Python代码中的安全漏洞
pip install bandit
bandit -r app/
# 使用Docker镜像扫描工具
docker scan llm-api:latest1. SQL注入
2. XSS攻击
3. CSRF攻击
4. 敏感信息泄露
随着大语言模型技术的快速发展,2025年LLM API服务的最佳实践也在不断演进。以下是当前行业领先的最佳实践总结:
# app/core/caching.py
import redis.asyncio as redis
import json
import os
from functools import wraps
from typing import Callable, Optional, Any
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
class CacheService:
def __init__(self):
self.redis_client: Optional[redis.Redis] = None
async def connect(self):
self.redis_client = await redis.from_url(REDIS_URL, decode_responses=True)
async def disconnect(self):
if self.redis_client:
await self.redis_client.close()
async def get(self, key: str) -> Optional[Any]:
if not self.redis_client:
await self.connect()
value = await self.redis_client.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value: Any, expire: int = 3600) -> None:
if not self.redis_client:
await self.connect()
await self.redis_client.setex(key, expire, json.dumps(value))
async def delete(self, key: str) -> None:
if not self.redis_client:
await self.connect()
await self.redis_client.delete(key)
# 创建全局缓存服务实例
cache_service = CacheService()
# 缓存装饰器
def cache_response(expire: int = 3600):
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 构建缓存键
# 注意:这是简化版本,实际生产环境需要更复杂的键生成策略
cache_key = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
# 尝试从缓存获取
cached_result = await cache_service.get(cache_key)
if cached_result is not None:
# 记录缓存命中
from app.core.metrics import CACHE_HIT_COUNTER
CACHE_HIT_COUNTER.inc()
return cached_result
# 执行函数
result = await func(*args, **kwargs)
# 缓存结果
await cache_service.set(cache_key, result, expire)
# 记录缓存未命中
from app.core.metrics import CACHE_MISS_COUNTER
CACHE_MISS_COUNTER.inc()
return result
return wrapper
return decorator在2025年,随着LLM模型规模和复杂度的增长,成本优化成为部署LLM API服务的关键挑战。以下是有效的成本优化策略:
# app/core/autoscaling.py
import asyncio
import time
from typing import Dict, List
class PredictiveScaling:
def __init__(self, min_instances: int = 2, max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
self.historical_load: List[Dict] = []
async def collect_metrics(self) -> Dict:
"""收集系统负载指标"""
# 在实际实现中,这里应该从监控系统获取实时指标
return {
"timestamp": time.time(),
"cpu_usage": 0.75, # 示例值
"memory_usage": 0.82, # 示例值
"request_rate": 120, # 示例值 - 每分钟请求数
"average_latency": 0.35 # 示例值 - 平均延迟(秒)
}
async def predict_load(self) -> float:
"""预测未来负载"""
# 在实际实现中,这里应该使用更复杂的预测算法
# 例如时间序列分析或机器学习模型
if not self.historical_load:
return 0.5 # 默认预测值
# 简单示例:基于最近5个数据点的平均
recent_loads = self.historical_load[-5:]
avg_request_rate = sum(item["request_rate"] for item in recent_loads) / len(recent_loads)
# 转换为0-1范围的负载预测
max_observed_rate = max(item["request_rate"] for item in self.historical_load)
return min(1.0, avg_request_rate / max_observed_rate * 1.2) # 增加20%的缓冲
async def adjust_capacity(self) -> int:
"""调整系统容量"""
# 收集当前指标
current_metrics = await self.collect_metrics()
self.historical_load.append(current_metrics)
# 预测未来负载
predicted_load = await self.predict_load()
# 计算目标实例数
target_instances = int(self.min_instances + predicted_load * (self.max_instances - self.min_instances))
target_instances = max(self.min_instances, min(self.max_instances, target_instances))
# 在实际实现中,这里应该调用云服务商API或Kubernetes API来调整实例数
self.current_instances = target_instances
print(f"调整实例数为: {target_instances} (预测负载: {predicted_load:.2f})")
return target_instances2025年LLM API服务领域正在经历快速变革,以下是值得关注的技术趋势:
基于当前技术趋势和最佳实践,以下是LLM API服务架构的演进路线图:
随着大语言模型技术的不断成熟,构建高性能、安全、可靠的LLM API服务成为企业数字化转型的关键基础设施。通过采用本文介绍的最佳实践和技术方案,您可以构建一个现代化的LLM API服务架构,满足2025年及未来的业务需求。
在构建LLM API服务时,始终牢记以下核心原则:
通过结合FastAPI的高性能特性和现代安全架构实践,您可以构建出既高效又安全的LLM API服务,为企业和用户创造持久的价值。
在未来几年,随着边缘计算、隐私保护技术和自适应系统的发展,LLM API服务将变得更加智能化、个性化和安全。通过持续关注技术趋势并不断演进您的架构,您可以确保您的LLM API服务始终保持竞争力,并为用户提供卓越的体验。