
数据持久化:别让你的数据"重启就没了"——SQLx/Diesel 入门指南
上篇咱们写了个 TODO API,数据存在 HashMap 里。听着挺好,直到你重启服务器...
啪!数据全没了。
用户:"我辛辛苦苦加的 TODO 呢?" 你:"呃...重启没了..." 用户:"???"
这就是为什么需要数据库。
数据库就像个保险柜,你的数据存进去,服务器重启、断电、甚至世界末日(只要硬盘没坏),数据都在。
今天咱们聊聊 Rust 里怎么操作数据库。主要有两个选择:
我的建议:新手用 SQLx,直接写 SQL,灵活又直观。ORM 爱好者随意。
生活化类比:
今天咱们把数据放进保险柜!
特性 | SQLx | Diesel |
|---|---|---|
类型 | 异步 SQL 库 | ORM |
SQL 检查 | 编译期 ✅ | 运行期 |
学习曲线 | 平缓 | 较陡 |
灵活性 | 高(直接写 SQL) | 中(DSL) |
数据库支持 | PostgreSQL, MySQL, SQLite | PostgreSQL, MySQL, SQLite |
迁移工具 | 内置 | 内置 |
我的选择: SQLx。为什么?
问题: 每次查询都新建数据库连接?慢死!
解决: 连接池。预先创建一堆连接,用完放回池里,下次复用。
类比:
问题: 数据库表结构变了怎么办?手动改?
解决: 迁移脚本。用代码管理数据库结构变化。
类比:
cargo new my_db_app
cd my_db_app
修改 Cargo.toml:
[package]
name = "my_db_app"
version = "0.1.0"
edition = "2021"
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "sqlite"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.35", features = ["full"] }
dotenvy = "0.15" # 读取 .env 文件
创建 .env 文件:
# PostgreSQL
DATABASE_URL=postgres://username:password@localhost:5432/mydb
# 或者 SQLite (开发方便)
DATABASE_URL=sqlite:./app.db
吐槽: 为啥推荐 SQLite 开发?因为不用装数据库服务!一个文件搞定,适合本地测试。
用 SQLx CLI 创建迁移:
cargo install sqlx-cli
# 创建迁移
sqlx migrate create create_users_table
这会在 migrations/ 目录创建文件:
-- migrations/20240101120000_create_users_table.sql
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
运行迁移:
sqlx migrate run
输出:
Applied 1 migration: create_users_table
// src/main.rs
use sqlx::{SqlitePool, Row};
use serde::Serialize;
#[derive(Serialize)]
struct User {
id: i64,
name: String,
email: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 从环境变量读取连接字符串
dotenvy::dotenv()?;
let database_url = std::env::var("DATABASE_URL")?;
// 创建连接池
let pool = SqlitePool::connect(&database_url).await?;
println!("✅ 数据库连接成功");
// 创建 (Create)
let result = sqlx::query(
"INSERT INTO users (name, email) VALUES (?, ?)"
)
.bind("Larry")
.bind("larry@example.com")
.execute(&pool)
.await?;
println!("插入了 {} 行", result.rows_affected());
// 查询 (Read)
let users = sqlx::query_as::<_, User>(
"SELECT id, name, email FROM users"
)
.fetch_all(&pool)
.await?;
println!("用户列表:");
for user in users {
println!(" - {} ({})", user.name, user.email);
}
// 更新 (Update)
let result = sqlx::query(
"UPDATE users SET name = ? WHERE email = ?"
)
.bind("Larry Updated")
.bind("larry@example.com")
.execute(&pool)
.await?;
println!("更新了 {} 行", result.rows_affected());
// 删除 (Delete)
let result = sqlx::query(
"DELETE FROM users WHERE email = ?"
)
.bind("larry@example.com")
.execute(&pool)
.await?;
println!("删除了 {} 行", result.rows_affected());
Ok(())
}
运行:
cargo run
输出:
✅ 数据库连接成功
插入了 1 行
用户列表:
- Larry (larry@example.com)
更新了 1 行
删除了 1 行
SQLx 的杀手锏:query! 宏,编译期检查 SQL!
use sqlx::SqlitePool;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv()?;
let pool = SqlitePool::connect(&std::env::var("DATABASE_URL")?).await?;
// query! 宏 - 编译期检查 SQL
let user = sqlx::query!(
"SELECT id, name, email FROM users WHERE id = ?",
)
.fetch_optional(&pool)
.await?;
match user {
Some(u) => println!("找到用户:{} ({})", u.name, u.email),
None => println!("用户不存在"),
}
Ok(())
}
爽在哪?
吐槽: 第一次用会觉得魔法,用惯了回不去。
// src/db.rs
use sqlx::{SqlitePool, Error};
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct User {
pub id: i64,
pub name: String,
pub email: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateUser {
pub name: String,
pub email: String,
}
#[derive(Debug, Deserialize)]
pub struct UpdateUser {
pub name: Option<String>,
pub email: Option<String>,
}
pub struct UserRepository {
pool: SqlitePool,
}
impl UserRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
// 创建用户
pub async fn create(&self, user: &CreateUser) -> Result<User, Error> {
let result = sqlx::query!(
"INSERT INTO users (name, email) VALUES (?, ?)",
user.name,
user.email
)
.execute(&self.pool)
.await?;
let id = result.last_insert_rowid();
Ok(User {
id,
name: user.name.clone(),
email: user.email.clone(),
})
}
// 查询所有用户
pub async fn find_all(&self) -> Result<Vec<User>, Error> {
let users = sqlx::query_as!(
User,
"SELECT id, name, email FROM users ORDER BY id"
)
.fetch_all(&self.pool)
.await?;
Ok(users)
}
// 根据 ID 查询
pub async fn find_by_id(&self, id: i64) -> Result<Option<User>, Error> {
let user = sqlx::query_as!(
User,
"SELECT id, name, email FROM users WHERE id = ?",
id
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
// 更新用户
pub async fn update(&self, id: i64, user: &UpdateUser) -> Result<Option<User>, Error> {
// 先查询现有用户
let existing = self.find_by_id(id).await?;
if existing.is_none() {
return Ok(None);
}
let existing = existing.unwrap();
// 构建更新 SQL
let mut updates = Vec::new();
let mut params = Vec::new();
if let Some(name) = &user.name {
updates.push("name = ?");
params.push(name.clone());
}
if let Some(email) = &user.email {
updates.push("email = ?");
params.push(email.clone());
}
if updates.is_empty() {
return Ok(Some(existing));
}
params.push(id);
let sql = format!(
"UPDATE users SET {} WHERE id = ?",
updates.join(", ")
);
// 动态 SQL 用 query()
let mut query = sqlx::query(&sql);
for param in params {
query = query.bind(param);
}
query.execute(&self.pool).await?;
self.find_by_id(id).await
}
// 删除用户
pub async fn delete(&self, id: i64) -> Result<bool, Error> {
let result = sqlx::query!(
"DELETE FROM users WHERE id = ?",
id
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > )
}
}
// src/main.rs
mod db;
use db::{UserRepository, CreateUser, UpdateUser};
use sqlx::SqlitePool;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv()?;
let pool = SqlitePool::connect(&std::env::var("DATABASE_URL")?).await?;
let repo = UserRepository::new(pool);
// 创建
let user = repo.create(&CreateUser {
name: "Larry".to_string(),
email: "larry@example.com".to_string(),
}).await?;
println!("创建用户:{:?}", user);
// 查询
let users = repo.find_all().await?;
println!("所有用户:{:?}", users);
// 更新
let updated = repo.update(user.id, &UpdateUser {
name: Some("Larry Updated".to_string()),
email: None,
}).await?;
println!("更新用户:{:?}", updated);
// 删除
let deleted = repo.delete(user.id).await?;
println!("删除成功:{}", deleted);
Ok(())
}
use sqlx::{SqlitePool, Transaction, Sqlite};
async fn transfer_money(
pool: &SqlitePool,
from_id: i64,
to_id: i64,
amount: i64,
) -> Result<(), sqlx::Error> {
// 开启事务
let mut tx = pool.begin().await?;
// 检查余额
let from_balance = sqlx::query_scalar!(
"SELECT balance FROM accounts WHERE id = ?",
from_id
)
.fetch_one(&mut *tx)
.await?;
if from_balance < amount {
return Err(sqlx::Error::RowNotFound); // 余额不足
}
// 扣款
sqlx::query!(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
amount,
from_id
)
.execute(&mut *tx)
.await?;
// 收款
sqlx::query!(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
amount,
to_id
)
.execute(&mut *tx)
.await?;
// 提交事务
tx.commit().await?;
println!("✅ 转账成功");
Ok(())
}
事务的特点:
现象: sqlx migrate 命令找不到
解决:
cargo install sqlx-cli
吐槽: 这玩意儿得单独装,我第一次找半天。
现象: query! 宏报错,说连不上数据库
原因: query! 宏编译时要连数据库检查 SQL
解决:
# 设置 DATABASE_URL 环境变量
export DATABASE_URL=sqlite:./app.db
# 或者用 .env 文件
echo "DATABASE_URL=sqlite:./app.db" > .env
错误:
// PostgreSQL 写法
let id = result.id; // ❌ SQLite 不这样
正确:
// SQLite
let id = result.last_insert_rowid();
现象: 程序结束,连接没释放
解决: SqlitePool 有 Drop 实现,一般不用管。但显式关闭更好:
pool.close().await;
// src/models.rs
use sqlx::{SqlitePool, Error};
use serde::{Serialize, Deserialize};
use chrono::{DateTime, Utc};
// 文章
#[derive(Debug, Serialize, Deserialize)]
pub struct Post {
pub id: i64,
pub title: String,
pub content: String,
pub author_id: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// 评论
#[derive(Debug, Serialize, Deserialize)]
pub struct Comment {
pub id: i64,
pub post_id: i64,
pub author_name: String,
pub content: String,
pub created_at: DateTime<Utc>,
}
// 文章仓库
pub struct PostRepository {
pool: SqlitePool,
}
impl PostRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
// 创建文章
pub async fn create(
&self,
title: &str,
content: &str,
author_id: i64,
) -> Result<Post, Error> {
let now = Utc::now();
let result = sqlx::query!(
"INSERT INTO posts (title, content, author_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)",
title,
content,
author_id,
now,
now
)
.execute(&self.pool)
.await?;
let id = result.last_insert_rowid();
Ok(Post {
id,
title: title.to_string(),
content: content.to_string(),
author_id,
created_at: now,
updated_at: now,
})
}
// 获取文章列表(带分页)
pub async fn find_paginated(
&self,
page: i64,
per_page: i64,
) -> Result<Vec<Post>, Error> {
let offset = (page - ) * per_page;
let posts = sqlx::query_as!(
Post,
"SELECT id, title, content, author_id, created_at, updated_at
FROM posts
ORDER BY created_at DESC
LIMIT ? OFFSET ?",
per_page,
offset
)
.fetch_all(&self.pool)
.await?;
Ok(posts)
}
// 获取文章详情(带评论)
pub async fn find_with_comments(
&self,
post_id: i64,
) -> Result<(Post, Vec<Comment>), Error> {
let mut tx = self.pool.begin().await?;
let post = sqlx::query_as!(
Post,
"SELECT id, title, content, author_id, created_at, updated_at
FROM posts WHERE id = ?",
post_id
)
.fetch_optional(&mut *tx)
.await?;
let post = post.ok_or(sqlx::Error::RowNotFound)?;
let comments = sqlx::query_as!(
Comment,
"SELECT id, post_id, author_name, content, created_at
FROM comments
WHERE post_id = ?
ORDER BY created_at ASC",
post_id
)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
Ok((post, comments))
}
// 添加评论
pub async fn add_comment(
&self,
post_id: i64,
author_name: &str,
content: &str,
) -> Result<Comment, Error> {
let now = Utc::now();
let result = sqlx::query!(
"INSERT INTO comments (post_id, author_name, content, created_at)
VALUES (?, ?, ?, ?)",
post_id,
author_name,
content,
now
)
.execute(&self.pool)
.await?;
let id = result.last_insert_rowid();
Ok(Comment {
id,
post_id,
author_name: author_name.to_string(),
content: content.to_string(),
created_at: now,
})
}
}
数据库迁移:
-- 文章表
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
-- 评论表
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL,
author_name TEXT NOT NULL,
content TEXT NOT NULL,
created_at DATETIME NOT NULL,
FOREIGN KEY (post_id) REFERENCES posts(id)
);
-- 索引
CREATE INDEX IF NOT EXISTS idx_posts_author ON posts(author_id);
CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id);

核心要点:
金句:
数据库是保险柜,HashMap 是便签纸。
连接池是共享单车,骑完放回去别人接着用。
事务是原子弹,要么全爆要么不爆,没有"爆一半"。
下篇预告:
数据库有了,数据怎么传给前端?JSON 序列化!下篇咱们聊聊 序列化,用 serde 把 Rust 结构体变成 JSON,顺便聊聊性能优化!