
"共享状态就像共享牙刷——可以用,但得加把锁!"
上篇我们说了,Rust 推崇消息传递,不推荐共享状态。但现实是:有时候你就是需要共享状态。
比如:
这时候怎么办?难道要绕一大圈用 channel 吗?
当然不是!Rust 提供了安全的共享状态工具:Mutex、RwLock、Arc。它们的核心思想是:用类型系统和运行时锁来保证安全。
生活化类比:
想象一个公共卫生间:
今天我们就来学习这些工具,以及 Rust 如何通过 Send 和 Sync trait 在编译时保证线程安全。
Mutex 是 Mutual Exclusion(互斥)的缩写。它的规则很简单:
Rust 的 Mutex 特点:
Arc 是 Atomic Reference Counting 的缩写。它是线程安全版本的 Rc。
为什么需要 Arc?
生活化类比:
Rc = 图书馆的书(不能带出图书馆)Arc = 电子书(可以多人同时拥有副本)RwLock 是 Read-Write Lock 的缩写。它比 Mutex 更灵活:
适用场景: 读多写少的情况,比如配置、缓存。
这两个 trait 是 Rust 并发安全的基石:
Send:类型可以安全地转移到另一个线程Sync:类型的引用 &T 可以安全地转移到另一个线程(即类型可以安全地共享)规则:
Send + SyncT 是 Send + Sync,那么 Arc<T> 也是T 是 Send,那么 Mutex<T> 是 Send + Sync编译器自动推导,你通常不需要手动实现。
use std::sync::Mutex;
fn main() {
let m = Mutex::new();
// 获取锁
{
let mut num = m.lock().unwrap();
*num = ; // 修改数据
println!("m = {}", *num);
} // num 离开作用域,锁自动释放
println!("最终 m = {}", *m.lock().unwrap());
}
输出:
m = 6
最终 m = 6
关键点:
lock() 返回 LockResult<MutexGuard<T>>MutexGuard 实现了 Deref,可以像引用一样使用MutexGuard 离开作用域时,锁自动释放(RAII 模式)* 解引用访问内部数据use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 创建 Arc<Mutex<i32>>
let counter = Arc::new(Mutex::new());
let mut handles = vec![];
// 创建 10 个线程
for _ in .. {
// 克隆 Arc(不是克隆数据)
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 获取锁
let mut num = counter.lock().unwrap();
*num += ;
println!("线程增加了计数器:{}", *num);
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
println!("最终计数器:{}", *counter.lock().unwrap());
}
输出:
线程增加了计数器:1
线程增加了计数器:2
...
线程增加了计数器:10
最终计数器:10
为什么需要 Arc?
Mutex<T> 不是 Copy 的Arc 提供共享所有权,每个线程持有一个引用use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![, , ]));
let mut handles = vec![];
// 5 个读者线程
for i in .. {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let read_guard = data.read().unwrap();
println!("读者 {} 看到:{:?}", i, *read_guard);
// read_guard 离开作用域,读锁释放
});
handles.push(handle);
}
// 1 个写者线程
let data = Arc::clone(&data);
let write_handle = thread::spawn(move || {
let mut write_guard = data.write().unwrap();
write_guard.push();
println!("写者添加了元素 4");
// write_guard 离开作用域,写锁释放
});
handles.push(write_handle);
for handle in handles {
handle.join().unwrap();
}
println!("最终数据:{:?}", *data.read().unwrap());
}
输出(顺序可能不同):
读者 0 看到:[1, 2, 3]
读者 1 看到:[1, 2, 3]
...
写者添加了元素 4
最终数据:[1, 2, 3, 4]
关键点:
read() 获取读锁,返回 RwLockReadGuardwrite() 获取写锁,返回 RwLockWriteGuarduse std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let mutex1 = Arc::new(Mutex::new());
let mutex2 = Arc::new(Mutex::new());
let mutex1_clone = Arc::clone(&mutex1);
let mutex2_clone = Arc::clone(&mutex2);
// 线程 1:先锁 mutex1,再锁 mutex2
let handle1 = thread::spawn(move || {
let _lock1 = mutex1_clone.lock().unwrap();
thread::sleep(std::time::Duration::from_millis());
let _lock2 = mutex2_clone.lock().unwrap(); // 可能永远等不到
});
// 线程 2:先锁 mutex2,再锁 mutex1
let handle2 = thread::spawn(move || {
let _lock2 = mutex2.lock().unwrap();
thread::sleep(std::time::Duration::from_millis());
let _lock1 = mutex1.lock().unwrap(); // 可能永远等不到
});
handle1.join().unwrap();
handle2.join().unwrap();
}
这就是死锁! 两个线程互相等待对方释放锁,永远卡住。
如何避免? 后面会讲。
use std::sync::Mutex;
fn main() {
let m = Mutex::new();
// ❌ 直接用 unwrap 可能 panic
let mut num = m.lock().unwrap();
// ✅ 更好的做法
match m.lock() {
Ok(mut guard) => {
*guard = ;
}
Err(poisoned) => {
// 另一个线程 panic 了,锁被"毒化"
println!("锁被毒化了!");
// 可以选择恢复:poisoned.into_inner()
}
}
}
锁毒化(Poisoning): 如果一个线程在持有锁时 panic 了,锁会被标记为"毒化",防止其他线程访问可能不一致的数据。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let data = Arc::new(Mutex::new(vec![, , ]));
// ❌ 错误:锁的粒度过大
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut guard = data_clone.lock().unwrap();
// 持有锁的同时做耗时操作
thread::sleep(Duration::from_secs());
guard.push();
// 锁持有时间过长,其他线程都要等
});
handle.join().unwrap();
}
修复: 缩小锁的范围,只在必要时持有锁。
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
let data = Arc::new(RwLock::new());
// 持续有读者,写者可能永远拿不到锁
for i in .. {
let data = Arc::clone(&data);
thread::spawn(move || {
loop {
let _read = data.read().unwrap();
thread::sleep(Duration::from_millis());
// 读者不断来,写者饿死了
}
});
}
// 写者可能永远等不到锁
let mut write = data.write().unwrap();
*write = ;
}
修复: 限制读者的频率,或者使用更公平的实现。
use std::sync::{Arc, Mutex};
use std::thread;
pub struct Counter {
value: Mutex<i32>,
}
impl Counter {
pub fn new() -> Self {
Counter {
value: Mutex::new(),
}
}
pub fn increment(&self) {
let mut value = self.value.lock().unwrap();
*value += ;
}
pub fn get(&self) -> i32 {
*self.value.lock().unwrap()
}
}
fn main() {
let counter = Arc::new(Counter::new());
let mut handles = vec![];
for _ in .. {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in .. {
counter.increment();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数:{}", counter.get()); // 1000
}
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
pub struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
pub fn new() -> Self {
Cache {
data: RwLock::new(HashMap::new()),
}
}
pub fn get(&self, key: &str) -> Option<String> {
let read_guard = self.data.read().unwrap();
read_guard.get(key).cloned()
}
pub fn set(&self, key: String, value: String) {
let mut write_guard = self.data.write().unwrap();
write_guard.insert(key, value);
}
}
fn main() {
let cache = Arc::new(Cache::new());
// 写者线程
let cache_writer = Arc::clone(&cache);
let writer = thread::spawn(move || {
for i in .. {
cache_writer.set(
format!("key_{}", i),
format!("value_{}", i),
);
thread::sleep(Duration::from_millis());
}
});
// 多个读者线程
let mut readers = vec![];
for r in .. {
let cache_reader = Arc::clone(&cache);
let reader = thread::spawn(move || {
for _ in .. {
for i in .. {
if let Some(value) = cache_reader.get(&format!("key_{}", i)) {
println!("读者 {} 读到:{} = {}", r, i, value);
}
}
thread::sleep(Duration::from_millis());
}
});
readers.push(reader);
}
writer.join().unwrap();
for reader in readers {
reader.join().unwrap();
}
}
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
pub struct WorkQueue<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
}
impl<T> WorkQueue<T> {
pub fn new() -> Self {
WorkQueue {
queue: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
}
}
pub fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
self.not_empty.notify_one(); // 通知等待的消费者
}
pub fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
// 如果队列为空,等待
while queue.is_empty() {
// 释放锁并等待,被通知后重新获取锁
queue = self.not_empty.wait(queue).unwrap();
}
queue.pop_front()
}
}
fn main() {
let queue = Arc::new(WorkQueue::new());
// 生产者
let producer = Arc::clone(&queue);
let producer_handle = thread::spawn(move || {
for i in .. {
println!("生产:{}", i);
producer.push(i);
thread::sleep(Duration::from_millis());
}
});
// 消费者
let consumer = Arc::clone(&queue);
let consumer_handle = thread::spawn(move || {
for _ in .. {
if let Some(item) = consumer.pop() {
println!("消费:{}", item);
}
}
});
producer_handle.join().unwrap();
consumer_handle.join().unwrap();
}

lock() 获取锁,MutexGuard 自动释放。Arc<Mutex<T>> 是标准组合。下篇预告: 有了这些工具,如何构建高效的并发程序?线程池怎么实现?死锁如何预防?下篇我们学习并发模式和最佳实践,让你写出既安全又高效的并发代码!