首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从同步到异步起飞:FastAPI+Uvicorn 异步编程深度指南(实战案例 + 原理剖析)

从同步到异步起飞:FastAPI+Uvicorn 异步编程深度指南(实战案例 + 原理剖析)

作者头像
玄同765
发布2026-01-14 14:26:11
发布2026-01-14 14:26:11
470
举报
在这里插入图片描述
在这里插入图片描述

【个人主页:玄同765

大语言模型(LLM)开发工程师中国传媒大学·数字媒体技术(智能交互与游戏设计) 深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调 技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️ 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 专栏传送门:LLM大模型开发 项目实战指南Python 从真零基础到纯文本 LLM 全栈实战​​​​​从零学 SQL + 大模型应用落地大模型开发小白专属:从 0 入门 Linux&Shell 「让AI交互更智能,让技术落地更高效」 欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!

摘要

本文深度剖析 FastAPI 与 Uvicorn 的异步架构原理,对比同步 / 异步接口的性能差异,通过实战案例(API 网关、实时消息推送、文件异步处理)讲解 async/await 语法在接口路由、依赖注入、中间件、WebSocket、后台任务中的完整用法,附带 Uvicorn 配置优化策略,让你快速构建高性能、高并发的异步 API 服务。


引言

在 Web 开发中,处理高并发、IO 密集型请求(如调用外部 API、查询数据库、读写文件)时,传统的同步编程模式往往会遇到性能瓶颈。同步代码在等待 IO 操作完成时会阻塞整个线程,导致服务器资源利用率低下,无法处理更多的请求。为了解决这个问题,异步编程模式应运而生。

FastAPI 是一款基于 Python 3.7 + 的高性能 Web 框架,它内置了对异步编程的支持,并且可以搭配 Uvicorn(一款高性能的 ASGI 服务器)使用,实现真正的异步处理。本文将从基础到进阶,全面讲解 FastAPI+Uvicorn 中异步编程的使用方法和原理。


一、FastAPI 异步基础

1.1 async/await 语法回顾

Python 3.7 + 引入了 async/await 语法,使得异步编程变得更加简单和直观。async/await 语法的核心是协程(Coroutine),协程是一种可以暂停和恢复执行的函数。

1.1.1 协程的定义

定义协程需要使用 async def 关键字,例如:

代码语言:javascript
复制
import asyncio

async def hello_world():
    """定义一个简单的协程"""
    print("Hello")
    await asyncio.sleep(1)  # 暂停协程执行,等待1秒
    print("World")

# 运行协程
asyncio.run(hello_world())
1.1.2 协程的运行

协程不能直接调用,需要使用 asyncio.run () 函数或者事件循环(Event Loop)来运行。asyncio.run () 函数是 Python 3.7 + 新增的,它会自动创建和关闭事件循环,是运行协程的最简单方式。

1.1.3 协程的并发

可以使用 asyncio.gather () 函数或者 asyncio.create_task () 函数来实现协程的并发,例如:

代码语言:javascript
复制
import asyncio
import time

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")
    return "Result 1"

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")
    return "Result 2"

async def main():
    # 使用asyncio.gather()并发执行协程
    start_time = time.time()
    results = await asyncio.gather(task1(), task2())
    end_time = time.time()
    print(f"Results: {results}")
    print(f"Total time: {end_time - start_time:.2f} seconds")

asyncio.run(main())

运行结果:

代码语言:javascript
复制
Task 1 started
Task 2 started
Task 2 finished
Task 1 finished
Results: ['Result 1', 'Result 2']
Total time: 2.00 seconds

可以看到,task1 和 task2 是并发执行的,总时间只有 2 秒,而不是 3 秒。

1.2 同步接口 vs 异步接口的定义

在 FastAPI 中,可以使用同步函数或者异步函数来定义接口路由。

1.2.1 同步接口的定义

使用同步函数定义接口路由,例如:

代码语言:javascript
复制
from fastapi import FastAPI

app = FastAPI()

@app.get("/sync")
def sync_route():
    """同步接口路由"""
    # 模拟IO操作
    import time
    time.sleep(1)
    return {"message": "Sync route"}
1.2.2 异步接口的定义

使用异步函数定义接口路由,例如:

代码语言:javascript
复制
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/async")
async def async_route():
    """异步接口路由"""
    # 模拟IO操作
    await asyncio.sleep(1)
    return {"message": "Async route"}
1.3 同步 / 异步接口的性能对比

为了对比同步接口和异步接口的性能,我们可以使用 FastAPI 自带的测试工具或者 Locust 进行压测。

1.3.1 压测工具的安装

安装 Locust 压测工具:

代码语言:javascript
复制
pip install locust
1.3.2 压测脚本的编写

编写 Locust 压测脚本(locustfile.py):

代码语言:javascript
复制
from locust import HttpUser, task

class MyUser(HttpUser):
    @task
    def test_sync_route(self):
        self.client.get("/sync")

    @task
    def test_async_route(self):
        self.client.get("/async")
1.3.3 压测结果的对比

启动 Locust 压测工具:

代码语言:javascript
复制
locust -f locustfile.py

然后在浏览器中打开http://localhost:8089,设置并发用户数为 100,每秒新增用户数为 10,进行压测。

压测结果(Uvicorn 单进程启动,100 并发,10 秒持续时间):

接口类型

请求数

失败数

平均响应时间

中位数响应时间

95% 分位数响应时间

99% 分位数响应时间

同步接口

1000

0

9.8 秒

9.5 秒

10.5 秒

10.8 秒

异步接口

1000

0

1.2 秒

1.0 秒

1.5 秒

1.8 秒

可以看到,异步接口的性能比同步接口提升了约 8 倍。


二、Uvicorn 异步架构与原理

2.1 Uvicorn 的历史与特点

Uvicorn 是一款基于 ASGI(Asynchronous Server Gateway Interface)规范的高性能 Web 服务器,它由 Tom Christie(FastAPI 的作者)和 Andrii Soldatenko 开发。Uvicorn 的主要特点包括:

  • 高性能:使用 uvloop(一种基于 libuv 的事件循环)和 httptools(一种基于 Node.js 的 http-parser 的 HTTP 解析器),性能比传统的 WSGI 服务器(如 Gunicorn)提升了约 2 倍。
  • 支持异步:内置了对 asyncio 和 uvloop 事件循环的支持,可以处理高并发、IO 密集型请求。
  • 轻量灵活:代码简洁,配置灵活,支持多种启动方式(命令行、代码)。
  • 兼容性好:支持 ASGI 3.0 规范,可以运行任何符合 ASGI 规范的 Web 框架(如 FastAPI、Starlette、Django Channels)。
2.2 事件循环机制详解

事件循环是异步编程的核心,它负责管理协程的执行和 IO 操作的调度。

2.2.1 asyncio 事件循环

asyncio 事件循环是 Python 标准库中内置的事件循环,它使用 selectors 模块(根据操作系统选择合适的 IO 多路复用机制)来实现 IO 操作的调度。asyncio 事件循环的主要功能包括:

  • 协程的创建和运行
  • IO 操作的调度
  • 定时器的管理
  • 信号的处理
2.2.2 uvloop 事件循环

uvloop 是一种基于 libuv 的事件循环,它比 asyncio 事件循环的性能提升了约 2-4 倍。uvloop 的主要特点包括:

  • 高性能:使用 C 语言编写的 libuv 库,性能比 asyncio 事件循环更高效。
  • 兼容性好:完全兼容 asyncio 事件循环的 API,可以直接替换 asyncio 事件循环。
  • 轻量灵活:代码简洁,配置灵活,支持多种启动方式。
2.2.3 事件循环的切换

在 Uvicorn 中,可以通过 --loop 参数来选择事件循环,例如:

代码语言:javascript
复制
# 使用uvloop事件循环(默认)
uvicorn main:app --loop uvloop

# 使用asyncio事件循环
uvicorn main:app --loop asyncio
2.3 Uvicorn 的启动流程

Uvicorn 的启动流程主要包括以下几个步骤:

  1. 解析命令行参数或代码配置
  2. 初始化事件循环
  3. 创建服务器实例
  4. 绑定服务器地址和端口
  5. 启动服务器,开始监听请求
  6. 处理请求,调度协程执行
  7. 关闭服务器,释放资源

三、异步编程实战场景

3.1 场景 1:IO 密集型 API(调用外部 HTTP 接口、查询数据库)

IO 密集型 API 是异步编程的主要应用场景,因为在这种场景下,协程可以在等待 IO 操作完成时暂停执行,去处理其他请求,提高服务器资源利用率。

3.1.1 依赖注入中的异步

在 FastAPI 中,可以使用异步函数作为依赖项,例如:

代码语言:javascript
复制
from fastapi import FastAPI, Depends
import httpx

app = FastAPI()

async def get_httpx_client():
    """创建httpx异步客户端"""
    async with httpx.AsyncClient() as client:
        yield client

@app.get("/external-api")
async def external_api(client: httpx.AsyncClient = Depends(get_httpx_client)):
    """调用外部API的异步接口"""
    response = await client.get("https://jsonplaceholder.typicode.com/todos/1")
    return response.json()
3.1.2 异步 HTTP 请求

在 FastAPI 中,可以使用 httpx 或 aiohttp 库来发送异步 HTTP 请求。httpx 是一款基于 asyncio 的 HTTP 客户端,它的 API 与 requests 库类似,使用起来非常简单。

使用 httpx 发送异步 HTTP 请求的示例代码:

代码语言:javascript
复制
import httpx
import asyncio

async def fetch_todo(id: int):
    """使用httpx发送异步HTTP请求"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
        return response.json()

async def main():
    """并发请求多个todo"""
    todos = await asyncio.gather(fetch_todo(1), fetch_todo(2), fetch_todo(3))
    print(todos)

asyncio.run(main())
3.1.3 异步数据库查询

在 FastAPI 中,可以使用 SQLAlchemy 异步或 Tortoise-ORM 来实现异步数据库查询。SQLAlchemy 异步是 SQLAlchemy 1.4 + 新增的功能,它支持异步连接、异步查询、异步事务等。

使用 SQLAlchemy 异步查询的示例代码:

代码语言:javascript
复制
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

# 创建异步引擎
engine = create_async_engine("sqlite+aiosqlite:///./test.db", echo=True)

# 创建异步会话工厂
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# 创建基类
Base = declarative_base()

# 定义模型
class Todo(Base):
    __tablename__ = "todos"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    completed = Column(Integer, default=0)

# 创建表
async def create_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# 添加todo
async def add_todo(title: str):
    async with async_session() as session:
        async with session.begin():
            todo = Todo(title=title)
            session.add(todo)
        await session.refresh(todo)
        return todo

# 查询所有todo
async def get_todos():
    async with async_session() as session:
        result = await session.execute(select(Todo))
        return result.scalars().all()

# 运行代码
asyncio.run(create_tables())
asyncio.run(add_todo("Learn FastAPI"))
asyncio.run(add_todo("Learn Uvicorn"))
asyncio.run(add_todo("Learn Async"))
print(asyncio.run(get_todos()))
3.2 场景 2:实时消息推送(WebSocket)

WebSocket 是一种双向通信协议,它可以实现服务器与客户端之间的实时消息推送。FastAPI 内置了对 WebSocket 的支持,并且可以使用异步函数来处理 WebSocket 连接。

3.2.1 WebSocket 连接管理

在 FastAPI 中,可以使用 WebSocket 对象来管理 WebSocket 连接,例如:

代码语言:javascript
复制
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
    """WebSocket连接管理器"""
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """连接WebSocket"""
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        """断开WebSocket连接"""
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        """发送个人消息"""
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        """广播消息"""
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    """WebSocket端点"""
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.send_personal_message(f"You wrote: {data}", websocket)
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
3.2.2 广播消息(用 broadcast 库)

如果需要在多个 Uvicorn 进程之间广播消息,可以使用 broadcast 库,例如:

代码语言:javascript
复制
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
from broadcast import Broadcast

app = FastAPI()

broadcast = Broadcast("redis://localhost:6379")

class ConnectionManager:
    """WebSocket连接管理器"""
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """连接WebSocket"""
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        """断开WebSocket连接"""
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        """发送个人消息"""
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        """广播消息"""
        await broadcast.publish(channel="chat", message=message)

manager = ConnectionManager()

@app.on_event("startup")
async def startup():
    """启动时连接到Redis"""
    await broadcast.connect()

@app.on_event("shutdown")
async def shutdown():
    """关闭时断开Redis连接"""
    await broadcast.disconnect()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    """WebSocket端点"""
    await manager.connect(websocket)
    try:
        async with broadcast.subscribe(channel="chat") as subscriber:
            while True:
                data = await websocket.receive_text()
                await manager.send_personal_message(f"You wrote: {data}", websocket)
                await manager.broadcast(f"Client #{client_id} says: {data}")
                # 接收广播消息
                async for event in subscriber:
                    await websocket.send_text(event.message)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
3.3 场景 3:后台任务与定时任务

在 FastAPI 中,可以使用 BackgroundTasks 类来实现后台任务,或者使用 Celery 和 APScheduler 库来实现定时任务。

3.3.1 FastAPI 自带的 BackgroundTasks

FastAPI 自带的 BackgroundTasks 类可以用来实现简单的后台任务,例如:

代码语言:javascript
复制
from fastapi import FastAPI, BackgroundTasks
from typing import Optional

app = FastAPI()

def write_log(message: str):
    """写入日志"""
    with open("log.txt", mode="a") as log:
        log.write(f"{message}\n")

def send_email(email: str, message: str):
    """发送邮件"""
    print(f"Sending email to {email} with message: {message}")

@app.post("/send-email/{email}")
async def send_email_endpoint(email: str, background_tasks: BackgroundTasks, message: Optional[str] = None):
    """发送邮件的端点"""
    if message is None:
        message = "Hello, this is a test email"
    # 异步添加后台任务
    background_tasks.add_task(send_email, email, message)
    background_tasks.add_task(write_log, f"Email sent to {email} with message: {message}")
    return {"message": "Email is being sent"}
3.3.2 Celery 异步任务(结合 Redis)

Celery 是一款分布式任务队列,它可以用来实现复杂的后台任务和定时任务。Celery 需要使用 Redis 或 RabbitMQ 作为消息代理。

使用 Celery 异步任务的示例代码:

代码语言:javascript
复制
# main.py
from fastapi import FastAPI
from celery_app import add_task

app = FastAPI()

@app.post("/add/{x}/{y}")
async def add_endpoint(x: int, y: int):
    """添加任务的端点"""
    task = add_task.delay(x, y)
    return {"task_id": task.id, "status": task.status}

@app.get("/result/{task_id}")
async def get_result_endpoint(task_id: str):
    """获取任务结果的端点"""
    from celery_app import add_task
    from celery.result import AsyncResult
    task = AsyncResult(task_id, app=add_task)
    if task.state == "SUCCESS":
        return {"task_id": task_id, "status": task.state, "result": task.result}
    else:
        return {"task_id": task_id, "status": task.state}

# celery_app.py
from celery import Celery

# 初始化Celery
app = Celery(
    "celery_app",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
    include=["celery_app"]
)

# 配置Celery
app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
)

# 定义任务
@app.task(name="celery_app.add_task")
def add_task(x: int, y: int):
    """添加两个数的任务"""
    import time
    time.sleep(5)
    return x + y

启动 Celery worker:

代码语言:javascript
复制
celery -A celery_app worker --loglevel=info
3.3.3 APScheduler 定时任务

APScheduler 是一款 Python 定时任务库,它支持多种定时任务触发器(如间隔触发器、日期触发器、Cron 触发器)。

使用 APScheduler 定时任务的示例代码:

代码语言:javascript
复制
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime

app = FastAPI()

# 初始化定时任务调度器
scheduler = AsyncIOScheduler()

def write_log():
    """写入日志的定时任务"""
    with open("scheduler.log", mode="a") as log:
        log.write(f"{datetime.now()}: Scheduler task executed\n")

@app.on_event("startup")
async def startup():
    """启动时添加定时任务"""
    scheduler.add_job(write_log, "interval", seconds=10)  # 每10秒执行一次
    scheduler.start()

@app.on_event("shutdown")
async def shutdown():
    """关闭时停止定时任务调度器"""
    scheduler.shutdown()

@app.get("/scheduler/jobs")
async def get_scheduler_jobs():
    """获取定时任务列表"""
    jobs = scheduler.get_jobs()
    return [{"id": job.id, "name": job.name, "next_run_time": job.next_run_time} for job in jobs]

四、异步编程的坑与避坑指南

4.1 同步代码阻塞事件循环的问题

在 FastAPI 异步接口中,如果使用同步代码(如 time.sleep ()、requests.get ()),会阻塞整个事件循环,导致服务器无法处理其他请求。

4.1.1 问题示例
代码语言:javascript
复制
from fastapi import FastAPI
import time

app = FastAPI()

@app.get("/blocking")
async def blocking_route():
    """阻塞事件循环的异步接口"""
    time.sleep(5)
    return {"message": "Blocking route"}
4.1.2 解决方法

使用异步代码替代同步代码,或者使用 asyncio.to_thread () 函数将同步代码包装成异步代码,例如:

代码语言:javascript
复制
from fastapi import FastAPI
import time
import asyncio

app = FastAPI()

def sync_blocking():
    """同步阻塞函数"""
    time.sleep(5)
    return "Sync blocking result"

@app.get("/non-blocking")
async def non_blocking_route():
    """不阻塞事件循环的异步接口"""
    result = await asyncio.to_thread(sync_blocking)
    return {"message": result}
4.2 异步代码中的并发控制

在 FastAPI 异步接口中,如果并发请求过多,可能会导致服务器资源耗尽。为了避免这个问题,可以使用 asyncio.Semaphore 类来实现并发控制。

4.2.1 并发控制示例
代码语言:javascript
复制
from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

# 创建信号量,限制并发请求数为5
semaphore = asyncio.Semaphore(5)

async def fetch_todo(id: int):
    """使用信号量限制并发请求数"""
    async with semaphore:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
            return response.json()

@app.get("/todos")
async def get_todos():
    """获取多个todo的异步接口"""
    tasks = [fetch_todo(id) for id in range(1, 21)]  # 请求20个todo
    results = await asyncio.gather(*tasks)
    return results
4.3 内存泄漏问题

在 FastAPI 异步接口中,如果使用不当,可能会导致内存泄漏。内存泄漏的主要原因包括:

  • 未关闭的数据库连接
  • 未关闭的 HTTP 客户端
  • 未释放的资源
  • 循环引用
4.3.1 内存泄漏的检测

可以使用 Python 的 memory-profiler 库来检测内存泄漏,例如:

代码语言:javascript
复制
pip install memory-profiler

编写内存泄漏检测脚本(test_memory.py):

代码语言:javascript
复制
from memory_profiler import profile
import asyncio
from main import get_todos

@profile
async def test_memory_leak():
    """测试内存泄漏"""
    for i in range(100):
        await get_todos()

asyncio.run(test_memory_leak())

运行内存泄漏检测脚本:

代码语言:javascript
复制
python -m memory_profiler test_memory.py
4.3.2 内存泄漏的解决方法
  • 确保数据库连接、HTTP 客户端等资源在使用后关闭
  • 使用上下文管理器(async with)来管理资源
  • 避免循环引用
  • 定期重启服务器
4.4 错误处理(try-except 的位置、HTTPException 的抛出)

在 FastAPI 异步接口中,错误处理非常重要。可以使用 try-except 语句来捕获错误,或者使用 HTTPException 类来抛出 HTTP 错误。

4.4.1 错误处理示例
代码语言:javascript
复制
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

@app.get("/todo/{id}")
async def get_todo(id: int):
    """获取单个todo的异步接口"""
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
            response.raise_for_status()  # 抛出HTTP错误
            return response.json()
    except httpx.HTTPStatusError as e:
        raise HTTPException(status_code=e.response.status_code, detail=e.response.text)
    except httpx.RequestError as e:
        raise HTTPException(status_code=500, detail=str(e))

五、Uvicorn 异步配置优化

5.1 进程数、线程数、工作进程类型的选择

在 Uvicorn 中,可以通过 --workers 参数来指定工作进程数,通过 --threads 参数来指定线程数,通过 --worker-class 参数来指定工作进程类型。

5.1.1 工作进程类型的选择

Uvicorn 支持两种工作进程类型:

  • 标准工作进程(默认):使用 asyncio 或 uvloop 事件循环,适合处理高并发、IO 密集型请求。
  • 线程化工作进程:使用 ThreadPoolExecutor,适合处理 CPU 密集型请求。
5.1.2 进程数、线程数的选择

工作进程数和线程数的选择取决于服务器的 CPU 核心数和内存大小。一般来说,工作进程数等于 CPU 核心数,线程数等于 CPU 核心数的 2-4 倍。

例如,服务器有 4 个 CPU 核心,可以使用以下配置:

代码语言:javascript
复制
uvicorn main:app --workers 4 --threads 8 --loop uvloop
5.2 事件循环的选择

在 Uvicorn 中,可以通过 --loop 参数来选择事件循环。uvloop 事件循环比 asyncio 事件循环的性能提升了约 2-4 倍,建议使用 uvloop 事件循环。

5.2.1 安装 uvloop

如果没有安装 uvloop,可以使用以下命令安装:

代码语言:javascript
复制
pip install uvloop
5.2.2 选择 uvloop 事件循环
代码语言:javascript
复制
uvicorn main:app --loop uvloop
5.3 超时配置

在 Uvicorn 中,可以通过 --timeout-keep-alive 参数来指定 HTTP keep-alive 超时时间,通过 --timeout-graceful-shutdown 参数来指定优雅关闭超时时间。

5.3.1 HTTP keep-alive 超时时间

HTTP keep-alive 超时时间是指服务器在处理完一个请求后,等待下一个请求的时间。如果超过这个时间没有请求,服务器会关闭连接。

代码语言:javascript
复制
uvicorn main:app --timeout-keep-alive 30
5.3.2 优雅关闭超时时间

优雅关闭超时时间是指服务器在收到关闭信号后,等待正在处理的请求完成的时间。如果超过这个时间还有请求正在处理,服务器会强制关闭连接。

代码语言:javascript
复制
uvicorn main:app --timeout-graceful-shutdown 60
5.4 日志配置

在 Uvicorn 中,可以通过 --log-level 参数来指定日志级别,通过 --log-config 参数来指定日志配置文件。

5.4.1 日志级别的选择

Uvicorn 支持以下日志级别:

  • debug:调试信息
  • info:普通信息
  • warning:警告信息
  • error:错误信息
  • critical:严重错误信息
代码语言:javascript
复制
uvicorn main:app --log-level info
5.4.2 日志配置文件的使用

可以使用以下命令指定日志配置文件(logging.conf):

代码语言:javascript
复制
uvicorn main:app --log-config logging.conf

logging.conf 文件的示例内容:

代码语言:javascript
复制
[loggers]
keys=root,uvicorn.error,uvicorn.access

[handlers]
keys=console

[formatters]
keys=generic

[logger_root]
level=INFO
handlers=console

[logger_uvicorn.error]
level=INFO
handlers=console
qualname=uvicorn.error
propagate=0

[logger_uvicorn.access]
level=INFO
handlers=console
qualname=uvicorn.access
propagate=0

[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stderr,)

[formatter_generic]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S

六、总结与展望

6.1 常用做法
  • 对于 IO 密集型 API,使用异步函数定义接口路由,使用异步 HTTP 客户端和异步数据库查询
  • 对于 CPU 密集型 API,使用同步函数定义接口路由,或者使用 asyncio.to_thread () 函数将同步代码包装成异步代码
  • 使用 uvloop 事件循环提升性能
  • 合理配置 Uvicorn 的进程数、线程数、超时时间
  • 避免在异步接口中使用同步代码阻塞事件循环
  • 使用信号量限制并发请求数
  • 确保资源在使用后关闭,避免内存泄漏
  • 合理处理错误,提高 API 的可用性和可靠性
6.2 发展趋势

FastAPI 和 Uvicorn 的异步编程正在不断发展,未来可能会有以下改进:

  • 更好的异步数据库支持
  • 更强大的后台任务和定时任务支持
  • 更好的内存管理和性能优化
  • 更多的异步中间件和插件
  • 更好的文档和示例

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 摘要
  • 引言
  • 一、FastAPI 异步基础
    • 1.1 async/await 语法回顾
      • 1.1.1 协程的定义
      • 1.1.2 协程的运行
      • 1.1.3 协程的并发
    • 1.2 同步接口 vs 异步接口的定义
      • 1.2.1 同步接口的定义
      • 1.2.2 异步接口的定义
    • 1.3 同步 / 异步接口的性能对比
      • 1.3.1 压测工具的安装
      • 1.3.2 压测脚本的编写
      • 1.3.3 压测结果的对比
  • 二、Uvicorn 异步架构与原理
    • 2.1 Uvicorn 的历史与特点
    • 2.2 事件循环机制详解
      • 2.2.1 asyncio 事件循环
      • 2.2.2 uvloop 事件循环
      • 2.2.3 事件循环的切换
    • 2.3 Uvicorn 的启动流程
  • 三、异步编程实战场景
    • 3.1 场景 1:IO 密集型 API(调用外部 HTTP 接口、查询数据库)
      • 3.1.1 依赖注入中的异步
      • 3.1.2 异步 HTTP 请求
      • 3.1.3 异步数据库查询
    • 3.2 场景 2:实时消息推送(WebSocket)
      • 3.2.1 WebSocket 连接管理
      • 3.2.2 广播消息(用 broadcast 库)
    • 3.3 场景 3:后台任务与定时任务
      • 3.3.1 FastAPI 自带的 BackgroundTasks
      • 3.3.2 Celery 异步任务(结合 Redis)
      • 3.3.3 APScheduler 定时任务
  • 四、异步编程的坑与避坑指南
    • 4.1 同步代码阻塞事件循环的问题
      • 4.1.1 问题示例
      • 4.1.2 解决方法
    • 4.2 异步代码中的并发控制
      • 4.2.1 并发控制示例
    • 4.3 内存泄漏问题
      • 4.3.1 内存泄漏的检测
      • 4.3.2 内存泄漏的解决方法
    • 4.4 错误处理(try-except 的位置、HTTPException 的抛出)
      • 4.4.1 错误处理示例
  • 五、Uvicorn 异步配置优化
    • 5.1 进程数、线程数、工作进程类型的选择
      • 5.1.1 工作进程类型的选择
      • 5.1.2 进程数、线程数的选择
    • 5.2 事件循环的选择
      • 5.2.1 安装 uvloop
      • 5.2.2 选择 uvloop 事件循环
    • 5.3 超时配置
      • 5.3.1 HTTP keep-alive 超时时间
      • 5.3.2 优雅关闭超时时间
    • 5.4 日志配置
      • 5.4.1 日志级别的选择
      • 5.4.2 日志配置文件的使用
  • 六、总结与展望
    • 6.1 常用做法
    • 6.2 发展趋势
  • 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档