首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从零开始:FastAPI+SQLAlchemy+pymysql 连接 MySQL 全流程(建表、增删改查)

从零开始:FastAPI+SQLAlchemy+pymysql 连接 MySQL 全流程(建表、增删改查)

作者头像
玄同765
发布2026-01-14 14:25:11
发布2026-01-14 14:25:11
470
举报
在这里插入图片描述
在这里插入图片描述

【个人主页:玄同765

大语言模型(LLM)开发工程师中国传媒大学·数字媒体技术(智能交互与游戏设计) 深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调 技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️ 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 专栏传送门:LLM大模型开发 项目实战指南Python 从真零基础到纯文本 LLM 全栈实战​​​​​从零学 SQL + 大模型应用落地大模型开发小白专属:从 0 入门 Linux&Shell 「让AI交互更智能,让技术落地更高效」 欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!

摘要

这篇博客是为 FastAPI 新手准备的 MySQL 数据库连接全流程教程,详细讲解了如何安装 SQLAlchemy+pymysql,配置连接引擎与连接池,定义数据模型,初始化数据库,以及在 API 接口中实现增删改查操作,附带完整可运行代码、依赖注入用法和常见问题解决方法。


引言

在 FastAPI 开发中,经常需要与数据库进行交互。MySQL 是一款开源的关系型数据库管理系统,广泛应用于 Web 开发中。SQLAlchemy 是一款强大的 Python ORM(Object Relational Mapper)框架,它可以将 Python 对象与数据库表进行映射,让开发者可以使用 Python 代码来操作数据库,而不需要编写复杂的 SQL 语句。pymysql 是一款 Python 的 MySQL 驱动,它可以让 SQLAlchemy 连接到 MySQL 数据库。


一、前置准备

1.1 安装库

使用 pip 命令安装 SQLAlchemy 和 pymysql:

代码语言:javascript
复制
pip install sqlalchemy
pip install pymysql
1.2 安装 MySQL

如果你的电脑上没有安装 MySQL,可以根据操作系统下载安装:

  • Windows:下载 MySQL Installer,按照向导安装。
  • Mac:使用 Homebrew 安装,命令为brew install mysql
  • Linux(Ubuntu/Debian):使用 apt 命令安装,命令为sudo apt update && sudo apt install mysql-server
1.3 创建测试数据库

安装好 MySQL 后,使用 MySQL 客户端工具(如 MySQL Workbench、Navicat、命令行)连接到 MySQL 服务器,并创建一个测试数据库:

代码语言:javascript
复制
CREATE DATABASE IF NOT EXISTS fastapi_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

二、连接引擎配置

2.1 创建连接字符串

SQLAlchemy 需要使用连接字符串来连接到数据库。连接字符串的格式如下:

代码语言:javascript
复制
# MySQL连接字符串格式
DATABASE_URL = "mysql+pymysql://username:password@host:port/database_name"

参数说明:

  • username:MySQL 用户名(默认是 root)。
  • password:MySQL 密码。
  • host:MySQL 服务器地址(本地开发通常是localhost或 127.0.0.1)。
  • port:MySQL 服务器端口(默认是 3306)。
  • database_name:数据库名称(这里是 fastapi_db)。
2.2 配置连接引擎

使用 SQLAlchemy 的create_engine函数创建连接引擎:

代码语言:javascript
复制
from sqlalchemy import create_engine

# 连接字符串
DATABASE_URL = "mysql+pymysql://root:password@localhost:3306/fastapi_db"

# 创建连接引擎
engine = create_engine(
    DATABASE_URL,
    pool_size=10,          # 连接池大小
    max_overflow=20,       # 连接池最大溢出数
    pool_recycle=3600,     # 连接回收时间(秒),避免连接超时
    pool_pre_ping=True,    # 预检查连接是否有效
    echo=True              # 是否打印SQL语句,用于调试
)

连接池参数说明:

  • pool_size:连接池中的空闲连接数。
  • max_overflow:连接池中的最大连接数(空闲连接数 + 最大溢出数)。
  • pool_recycle:连接回收时间,避免连接超时(MySQL 的 wait_timeout 默认是 8 小时,建议设置为小于 8 小时的值)。
  • pool_pre_ping:预检查连接是否有效,避免连接失效。
  • echo:是否打印 SQL 语句,用于调试。

三、数据模型定义

3.1 创建基类

使用 SQLAlchemy 的declarative_base函数创建基类:

代码语言:javascript
复制
from sqlalchemy.ext.declarative import declarative_base

# 创建基类
Base = declarative_base()
3.2 定义数据模型

继承基类,定义数据模型,数据模型与数据库表进行映射:

代码语言:javascript
复制
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text
from datetime import datetime

# 定义用户数据模型
class User(Base):
    __tablename__ = "users"  # 数据库表名

    id = Column(Integer, primary_key=True, index=True)  # 主键,创建索引
    username = Column(String(50), unique=True, index=True, nullable=False)  # 用户名,唯一,创建索引,非空
    email = Column(String(100), unique=True, index=True, nullable=False)  # 邮箱,唯一,创建索引,非空
    password = Column(String(100), nullable=False)  # 密码,非空
    is_active = Column(Boolean, default=True)  # 是否激活,默认值为True
    created_at = Column(DateTime, default=datetime.now)  # 创建时间,默认值为当前时间
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)  # 更新时间,默认值为当前时间,更新时自动更新

# 定义文章数据模型
class Post(Base):
    __tablename__ = "posts"  # 数据库表名

    id = Column(Integer, primary_key=True, index=True)  # 主键,创建索引
    title = Column(String(200), index=True, nullable=False)  # 标题,创建索引,非空
    content = Column(Text, nullable=False)  # 内容,非空
    author_id = Column(Integer, index=True, nullable=False)  # 作者ID,创建索引,非空
    is_published = Column(Boolean, default=False)  # 是否发布,默认值为False
    created_at = Column(DateTime, default=datetime.now)  # 创建时间,默认值为当前时间
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)  # 更新时间,默认值为当前时间,更新时自动更新

字段类型说明:

SQLAlchemy 类型

对应 MySQL 类型

说明

Integer

INT

整数类型

String(length)

VARCHAR(length)

可变长度字符串类型

Float

FLOAT

浮点类型

Boolean

TINYINT(1)

布尔类型(0 表示 False,1 表示 True)

DateTime

DATETIME

日期时间类型

Text

TEXT

长文本类型


四、数据库初始化

4.1 创建会话工厂

使用 SQLAlchemy 的sessionmaker函数创建会话工厂:

代码语言:javascript
复制
from sqlalchemy.orm import sessionmaker

# 创建会话工厂
SessionLocal = sessionmaker(
    autocommit=False,  # 自动提交
    autoflush=False,  # 自动刷新
    bind=engine        # 绑定连接引擎
)
4.2 创建数据库表

使用基类的metadata.create_all方法创建数据库表:

代码语言:javascript
复制
# 创建数据库表
Base.metadata.create_all(bind=engine)
4.3 创建依赖注入函数

使用 FastAPI 的依赖注入系统,创建一个函数来提供数据库会话:

代码语言:javascript
复制
from fastapi import Depends
from typing import Generator

# 创建依赖注入函数
def get_db() -> Generator:
    """提供数据库会话的依赖注入函数"""
    db = SessionLocal()
    try:
        yield db  # 返回数据库会话
    finally:
        db.close()  # 关闭数据库会话

五、API 接口实现

5.1 定义 Pydantic 数据模型

使用 FastAPI 的 Pydantic 数据模型,定义请求和响应数据模型:

代码语言:javascript
复制
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime

# 用户请求数据模型
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

# 用户响应数据模型
class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        orm_mode = True  # 启用ORM模式,允许从SQLAlchemy对象直接转换

# 用户更新请求数据模型
class UserUpdate(BaseModel):
    username: Optional[str] = None
    email: Optional[EmailStr] = None
    password: Optional[str] = None
    is_active: Optional[bool] = None

# 文章请求数据模型
class PostCreate(BaseModel):
    title: str
    content: str
    author_id: int
    is_published: Optional[bool] = False

# 文章响应数据模型
class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    author_id: int
    is_published: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        orm_mode = True

# 文章更新请求数据模型
class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    author_id: Optional[int] = None
    is_published: Optional[bool] = None
5.2 实现用户 CRUD 操作的 API 接口
代码语言:javascript
复制
from fastapi import APIRouter, HTTPException, status
from typing import List

# 创建用户路由组
user_router = APIRouter(
    prefix="/users",  # 路由前缀
    tags=["users"],  # 标签,用于API文档分组
)

# 创建用户
@user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate, db: SessionLocal = Depends(get_db)):
    """创建用户"""
    # 检查用户名是否已存在
    existing_user = db.query(User).filter(User.username == user.username).first()
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already exists"
        )
    # 检查邮箱是否已存在
    existing_email = db.query(User).filter(User.email == user.email).first()
    if existing_email:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already exists"
        )
    # 创建用户
    db_user = User(**user.dict())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

# 查询所有用户
@user_router.get("/", response_model=List[UserResponse])
def read_users(skip: int = 0, limit: int = 10, db: SessionLocal = Depends(get_db)):
    """查询所有用户"""
    users = db.query(User).offset(skip).limit(limit).all()
    return users

# 查询单个用户
@user_router.get("/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: SessionLocal = Depends(get_db)):
    """查询单个用户"""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    return user

# 更新用户
@user_router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user: UserUpdate, db: SessionLocal = Depends(get_db)):
    """更新用户"""
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    # 更新用户信息
    for key, value in user.dict(exclude_unset=True).items():
        setattr(db_user, key, value)
    db.commit()
    db.refresh(db_user)
    return db_user

# 删除用户
@user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: SessionLocal = Depends(get_db)):
    """删除用户"""
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    db.delete(db_user)
    db.commit()
5.3 实现文章 CRUD 操作的 API 接口
代码语言:javascript
复制
# 创建文章路由组
post_router = APIRouter(
    prefix="/posts",  # 路由前缀
    tags=["posts"],  # 标签,用于API文档分组
)

# 创建文章
@post_router.post("/", response_model=PostResponse, status_code=status.HTTP_201_CREATED)
def create_post(post: PostCreate, db: SessionLocal = Depends(get_db)):
    """创建文章"""
    # 检查作者是否存在
    existing_user = db.query(User).filter(User.id == post.author_id).first()
    if not existing_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {post.author_id} not found"
        )
    # 创建文章
    db_post = Post(**post.dict())
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post

# 查询所有文章
@post_router.get("/", response_model=List[PostResponse])
def read_posts(skip: int = 0, limit: int = 10, db: SessionLocal = Depends(get_db)):
    """查询所有文章"""
    posts = db.query(Post).offset(skip).limit(limit).all()
    return posts

# 查询单个文章
@post_router.get("/{post_id}", response_model=PostResponse)
def read_post(post_id: int, db: SessionLocal = Depends(get_db)):
    """查询单个文章"""
    post = db.query(Post).filter(Post.id == post_id).first()
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Post with id {post_id} not found"
        )
    return post

# 更新文章
@post_router.put("/{post_id}", response_model=PostResponse)
def update_post(post_id: int, post: PostUpdate, db: SessionLocal = Depends(get_db)):
    """更新文章"""
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if not db_post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Post with id {post_id} not found"
        )
    # 更新文章信息
    for key, value in post.dict(exclude_unset=True).items():
        setattr(db_post, key, value)
    db.commit()
    db.refresh(db_post)
    return db_post

# 删除文章
@post_router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(post_id: int, db: SessionLocal = Depends(get_db)):
    """删除文章"""
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if not db_post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Post with id {post_id} not found"
        )
    db.delete(db_post)
    db.commit()
5.4 启动 FastAPI 应用程序
代码语言:javascript
复制
from fastapi import FastAPI

# 创建FastAPI应用程序实例
app = FastAPI(
    title="FastAPI+SQLAlchemy+pymysql示例",
    description="这是一个使用FastAPI+SQLAlchemy+pymysql连接MySQL的示例应用程序",
    version="1.0.0"
)

# 将路由组添加到FastAPI应用程序实例中
app.include_router(user_router)
app.include_router(post_router)

# 启动应用程序
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

六、常见问题解决

6.1 连接超时问题

现象:服务器报错pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')解决方法:在连接引擎配置中,设置pool_recycle参数为小于 MySQL 的wait_timeout的值(MySQL 的wait_timeout默认是 28800 秒,建议设置为 3600 秒)。

6.2 连接池满问题

现象:服务器报错sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out, timeout 30解决方法:在连接引擎配置中,适当增加pool_sizemax_overflow参数的值。

6.3 字段验证失败问题

现象:服务器报错pydantic.error_wrappers.ValidationError解决方法:检查请求数据是否符合 Pydantic 数据模型的要求,比如字段类型、长度、格式等。

6.4 外键约束错误问题

现象:服务器报错sqlalchemy.exc.IntegrityError: (pymysql.err.IntegrityError) (1452, 'Cannot add or update a child row: a foreign key constraint fails')解决方法:检查外键字段的值是否存在于对应的父表中。

6.5 查询性能优化问题

现象:查询数据的响应时间过长解决方法

  1. 在查询频繁的字段上创建索引(使用index=True参数)。
  2. 避免查询不必要的字段(使用db.query(User.id, User.username).all())。
  3. 使用分页查询(使用offsetlimit方法)。
  4. 使用预加载关联数据(使用db.query(User).options(joinedload(User.posts)).all())。

七、总结与展望

7.1 回顾整个流程
  1. 安装 SQLAlchemy 和 pymysql。
  2. 安装 MySQL 并创建测试数据库。
  3. 配置连接引擎与连接池。
  4. 定义数据模型。
  5. 初始化数据库。
  6. 实现 API 接口。
7.2 后续学习内容
  1. 异步 SQLAlchemy:使用sqlalchemy.ext.asyncio实现异步数据库操作。
  2. 数据库迁移工具 Alembic:管理数据库表结构的变更。
  3. 数据库连接池优化:根据实际场景调整连接池参数。
  4. 数据库查询优化:使用 SQLAlchemy 的查询 API 进行优化。

参考资料

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=av3ho2vbtmi

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-01-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 摘要
  • 引言
  • 一、前置准备
    • 1.1 安装库
    • 1.2 安装 MySQL
    • 1.3 创建测试数据库
  • 二、连接引擎配置
    • 2.1 创建连接字符串
    • 2.2 配置连接引擎
  • 三、数据模型定义
    • 3.1 创建基类
    • 3.2 定义数据模型
  • 四、数据库初始化
    • 4.1 创建会话工厂
    • 4.2 创建数据库表
    • 4.3 创建依赖注入函数
  • 五、API 接口实现
    • 5.1 定义 Pydantic 数据模型
    • 5.2 实现用户 CRUD 操作的 API 接口
    • 5.3 实现文章 CRUD 操作的 API 接口
    • 5.4 启动 FastAPI 应用程序
  • 六、常见问题解决
    • 6.1 连接超时问题
    • 6.2 连接池满问题
    • 6.3 字段验证失败问题
    • 6.4 外键约束错误问题
    • 6.5 查询性能优化问题
  • 七、总结与展望
    • 7.1 回顾整个流程
    • 7.2 后续学习内容
  • 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档