

【个人主页:玄同765】
大语言模型(LLM)开发工程师|中国传媒大学·数字媒体技术(智能交互与游戏设计) 深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调 技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️ 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 专栏传送门:LLM大模型开发 项目实战指南、Python 从真零基础到纯文本 LLM 全栈实战、从零学 SQL + 大模型应用落地、大模型开发小白专属:从 0 入门 Linux&Shell 「让AI交互更智能,让技术落地更高效」 欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!
这篇博客是为 FastAPI 新手准备的 MySQL 数据库连接全流程教程,详细讲解了如何安装 SQLAlchemy+pymysql,配置连接引擎与连接池,定义数据模型,初始化数据库,以及在 API 接口中实现增删改查操作,附带完整可运行代码、依赖注入用法和常见问题解决方法。
在 FastAPI 开发中,经常需要与数据库进行交互。MySQL 是一款开源的关系型数据库管理系统,广泛应用于 Web 开发中。SQLAlchemy 是一款强大的 Python ORM(Object Relational Mapper)框架,它可以将 Python 对象与数据库表进行映射,让开发者可以使用 Python 代码来操作数据库,而不需要编写复杂的 SQL 语句。pymysql 是一款 Python 的 MySQL 驱动,它可以让 SQLAlchemy 连接到 MySQL 数据库。
使用 pip 命令安装 SQLAlchemy 和 pymysql:
pip install sqlalchemy
pip install pymysql如果你的电脑上没有安装 MySQL,可以根据操作系统下载安装:
brew install mysql。sudo apt update && sudo apt install mysql-server。安装好 MySQL 后,使用 MySQL 客户端工具(如 MySQL Workbench、Navicat、命令行)连接到 MySQL 服务器,并创建一个测试数据库:
CREATE DATABASE IF NOT EXISTS fastapi_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;SQLAlchemy 需要使用连接字符串来连接到数据库。连接字符串的格式如下:
# MySQL连接字符串格式
DATABASE_URL = "mysql+pymysql://username:password@host:port/database_name"参数说明:
username:MySQL 用户名(默认是 root)。password:MySQL 密码。host:MySQL 服务器地址(本地开发通常是localhost或 127.0.0.1)。port:MySQL 服务器端口(默认是 3306)。database_name:数据库名称(这里是 fastapi_db)。使用 SQLAlchemy 的create_engine函数创建连接引擎:
from sqlalchemy import create_engine
# 连接字符串
DATABASE_URL = "mysql+pymysql://root:password@localhost:3306/fastapi_db"
# 创建连接引擎
engine = create_engine(
DATABASE_URL,
pool_size=10, # 连接池大小
max_overflow=20, # 连接池最大溢出数
pool_recycle=3600, # 连接回收时间(秒),避免连接超时
pool_pre_ping=True, # 预检查连接是否有效
echo=True # 是否打印SQL语句,用于调试
)连接池参数说明:
pool_size:连接池中的空闲连接数。max_overflow:连接池中的最大连接数(空闲连接数 + 最大溢出数)。pool_recycle:连接回收时间,避免连接超时(MySQL 的 wait_timeout 默认是 8 小时,建议设置为小于 8 小时的值)。pool_pre_ping:预检查连接是否有效,避免连接失效。echo:是否打印 SQL 语句,用于调试。使用 SQLAlchemy 的declarative_base函数创建基类:
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()继承基类,定义数据模型,数据模型与数据库表进行映射:
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text
from datetime import datetime
# 定义用户数据模型
class User(Base):
__tablename__ = "users" # 数据库表名
id = Column(Integer, primary_key=True, index=True) # 主键,创建索引
username = Column(String(50), unique=True, index=True, nullable=False) # 用户名,唯一,创建索引,非空
email = Column(String(100), unique=True, index=True, nullable=False) # 邮箱,唯一,创建索引,非空
password = Column(String(100), nullable=False) # 密码,非空
is_active = Column(Boolean, default=True) # 是否激活,默认值为True
created_at = Column(DateTime, default=datetime.now) # 创建时间,默认值为当前时间
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # 更新时间,默认值为当前时间,更新时自动更新
# 定义文章数据模型
class Post(Base):
__tablename__ = "posts" # 数据库表名
id = Column(Integer, primary_key=True, index=True) # 主键,创建索引
title = Column(String(200), index=True, nullable=False) # 标题,创建索引,非空
content = Column(Text, nullable=False) # 内容,非空
author_id = Column(Integer, index=True, nullable=False) # 作者ID,创建索引,非空
is_published = Column(Boolean, default=False) # 是否发布,默认值为False
created_at = Column(DateTime, default=datetime.now) # 创建时间,默认值为当前时间
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # 更新时间,默认值为当前时间,更新时自动更新字段类型说明:
SQLAlchemy 类型 | 对应 MySQL 类型 | 说明 |
|---|---|---|
Integer | INT | 整数类型 |
String(length) | VARCHAR(length) | 可变长度字符串类型 |
Float | FLOAT | 浮点类型 |
Boolean | TINYINT(1) | 布尔类型(0 表示 False,1 表示 True) |
DateTime | DATETIME | 日期时间类型 |
Text | TEXT | 长文本类型 |
使用 SQLAlchemy 的sessionmaker函数创建会话工厂:
from sqlalchemy.orm import sessionmaker
# 创建会话工厂
SessionLocal = sessionmaker(
autocommit=False, # 自动提交
autoflush=False, # 自动刷新
bind=engine # 绑定连接引擎
)使用基类的metadata.create_all方法创建数据库表:
# 创建数据库表
Base.metadata.create_all(bind=engine)使用 FastAPI 的依赖注入系统,创建一个函数来提供数据库会话:
from fastapi import Depends
from typing import Generator
# 创建依赖注入函数
def get_db() -> Generator:
"""提供数据库会话的依赖注入函数"""
db = SessionLocal()
try:
yield db # 返回数据库会话
finally:
db.close() # 关闭数据库会话使用 FastAPI 的 Pydantic 数据模型,定义请求和响应数据模型:
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
# 用户请求数据模型
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
# 用户响应数据模型
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True # 启用ORM模式,允许从SQLAlchemy对象直接转换
# 用户更新请求数据模型
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
is_active: Optional[bool] = None
# 文章请求数据模型
class PostCreate(BaseModel):
title: str
content: str
author_id: int
is_published: Optional[bool] = False
# 文章响应数据模型
class PostResponse(BaseModel):
id: int
title: str
content: str
author_id: int
is_published: bool
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
# 文章更新请求数据模型
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
author_id: Optional[int] = None
is_published: Optional[bool] = Nonefrom fastapi import APIRouter, HTTPException, status
from typing import List
# 创建用户路由组
user_router = APIRouter(
prefix="/users", # 路由前缀
tags=["users"], # 标签,用于API文档分组
)
# 创建用户
@user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate, db: SessionLocal = Depends(get_db)):
"""创建用户"""
# 检查用户名是否已存在
existing_user = db.query(User).filter(User.username == user.username).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# 检查邮箱是否已存在
existing_email = db.query(User).filter(User.email == user.email).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already exists"
)
# 创建用户
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 查询所有用户
@user_router.get("/", response_model=List[UserResponse])
def read_users(skip: int = 0, limit: int = 10, db: SessionLocal = Depends(get_db)):
"""查询所有用户"""
users = db.query(User).offset(skip).limit(limit).all()
return users
# 查询单个用户
@user_router.get("/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: SessionLocal = Depends(get_db)):
"""查询单个用户"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
return user
# 更新用户
@user_router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user: UserUpdate, db: SessionLocal = Depends(get_db)):
"""更新用户"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
# 更新用户信息
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
# 删除用户
@user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: SessionLocal = Depends(get_db)):
"""删除用户"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
db.delete(db_user)
db.commit()# 创建文章路由组
post_router = APIRouter(
prefix="/posts", # 路由前缀
tags=["posts"], # 标签,用于API文档分组
)
# 创建文章
@post_router.post("/", response_model=PostResponse, status_code=status.HTTP_201_CREATED)
def create_post(post: PostCreate, db: SessionLocal = Depends(get_db)):
"""创建文章"""
# 检查作者是否存在
existing_user = db.query(User).filter(User.id == post.author_id).first()
if not existing_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {post.author_id} not found"
)
# 创建文章
db_post = Post(**post.dict())
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
# 查询所有文章
@post_router.get("/", response_model=List[PostResponse])
def read_posts(skip: int = 0, limit: int = 10, db: SessionLocal = Depends(get_db)):
"""查询所有文章"""
posts = db.query(Post).offset(skip).limit(limit).all()
return posts
# 查询单个文章
@post_router.get("/{post_id}", response_model=PostResponse)
def read_post(post_id: int, db: SessionLocal = Depends(get_db)):
"""查询单个文章"""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Post with id {post_id} not found"
)
return post
# 更新文章
@post_router.put("/{post_id}", response_model=PostResponse)
def update_post(post_id: int, post: PostUpdate, db: SessionLocal = Depends(get_db)):
"""更新文章"""
db_post = db.query(Post).filter(Post.id == post_id).first()
if not db_post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Post with id {post_id} not found"
)
# 更新文章信息
for key, value in post.dict(exclude_unset=True).items():
setattr(db_post, key, value)
db.commit()
db.refresh(db_post)
return db_post
# 删除文章
@post_router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(post_id: int, db: SessionLocal = Depends(get_db)):
"""删除文章"""
db_post = db.query(Post).filter(Post.id == post_id).first()
if not db_post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Post with id {post_id} not found"
)
db.delete(db_post)
db.commit()from fastapi import FastAPI
# 创建FastAPI应用程序实例
app = FastAPI(
title="FastAPI+SQLAlchemy+pymysql示例",
description="这是一个使用FastAPI+SQLAlchemy+pymysql连接MySQL的示例应用程序",
version="1.0.0"
)
# 将路由组添加到FastAPI应用程序实例中
app.include_router(user_router)
app.include_router(post_router)
# 启动应用程序
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)现象:服务器报错pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')解决方法:在连接引擎配置中,设置pool_recycle参数为小于 MySQL 的wait_timeout的值(MySQL 的wait_timeout默认是 28800 秒,建议设置为 3600 秒)。
现象:服务器报错sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out, timeout 30解决方法:在连接引擎配置中,适当增加pool_size和max_overflow参数的值。
现象:服务器报错pydantic.error_wrappers.ValidationError解决方法:检查请求数据是否符合 Pydantic 数据模型的要求,比如字段类型、长度、格式等。
现象:服务器报错sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1452, 'Cannot add or update a child row: a foreign key constraint fails')解决方法:检查外键字段的值是否存在于对应的父表中。
现象:查询数据的响应时间过长解决方法:
index=True参数)。db.query(User.id, User.username).all())。offset和limit方法)。db.query(User).options(joinedload(User.posts)).all())。sqlalchemy.ext.asyncio实现异步数据库操作。我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=av3ho2vbtmi