
"别共享内存了,互相打电话吧!" —— Rust 并发哲学的核心
还记得上篇我们说的厨房类比吗?多个厨师(线程)同时做菜,如果都去同一个冰箱(共享内存)拿食材,很容易撞车。
那怎么办呢?有两种方案:
方案 1:共享状态
方案 2:消息传递
Rust 推崇方案 2——消息传递。这是 Go 语言的名言 "Don't communicate by sharing memory, share memory by communicating" 的 Rust 版本。
今天我们就来学习 Rust 的消息传递机制:channel(通道)、mpsc(多生产者单消费者)、以及如何实现 Actor 模式。
消息传递的核心思想是:线程之间不直接共享数据,而是通过发送和接收消息来通信。
生活化类比:
想象一个公司:
员工把报告放进邮箱,老板从邮箱取报告。员工和老板不需要直接见面,也不需要共享同一个办公桌。
channel 是 Rust 标准库提供的消息传递机制。它就像一根管道:
[生产者] ---> [channel] ---> [消费者]
send() 把数据放进管道recv() 从管道取出数据mpsc 是 multiple producer, single consumer(多生产者单消费者)的缩写。
为什么是单消费者?因为如果多个消费者同时从同一个 channel 收消息,可能会导致消息被重复处理或者丢失。
Rust 编译器: "多个消费者?那消息到底归谁?我不允许这种 ambiguity!"
use std::sync::mpsc;
use std::thread;
fn main() {
// 1. 创建 channel
// tx = transmitter (发送端)
// rx = receiver (接收端)
let (tx, rx) = mpsc::channel();
// 2. 创建生产者线程
thread::spawn(move || {
let msg = String::from("你好,消费者!");
tx.send(msg).unwrap();
// msg 的所有权已经转移给 channel 了
});
// 3. 消费者接收消息
// recv 会阻塞,直到有消息到达
let received = rx.recv().unwrap();
println!("收到:{}", received);
}
输出:
收到:你好,消费者!
关键点:
mpsc::channel() 返回一对端点:发送端 tx 和接收端 rxsend() 发送消息,返回 Result<(), SendError<T>>recv() 接收消息,会阻塞直到有消息,返回 Result<T, RecvError>use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
String::from("消息 1"),
String::from("消息 2"),
String::from("消息 3"),
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis());
}
});
// 方式 1:阻塞接收
// for received in rx {
// println!("收到:{}", received);
// }
// 方式 2:非阻塞接收
loop {
match rx.recv_timeout(Duration::from_millis()) {
Ok(msg) => println!("收到:{}", msg),
Err(_) => {
println!("超时,没有新消息");
break;
}
}
}
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 生产者 1
let tx1 = tx.clone(); // 克隆发送端
thread::spawn(move || {
for i in ..= {
tx1.send(format!("生产者 1: 消息 {}", i)).unwrap();
thread::sleep(Duration::from_millis());
}
});
// 生产者 2
let tx2 = tx.clone();
thread::spawn(move || {
for i in ..= {
tx2.send(format!("生产者 2: 消息 {}", i)).unwrap();
thread::sleep(Duration::from_millis());
}
});
// 原始的 tx 在这里会 drop,不影响其他克隆
// 消费者接收所有消息
for received in rx {
println!("收到:{}", received);
}
}
输出(顺序可能不同):
收到:生产者 1: 消息 1
收到:生产者 2: 消息 1
收到:生产者 1: 消息 2
收到:生产者 2: 消息 2
收到:生产者 1: 消息 3
收到:生产者 2: 消息 3
关键点:
tx.clone() 创建多个发送端recv() 会返回错误channel 可以发送任何类型的数据,包括枚举:
use std::sync::mpsc;
use std::thread;
// 定义消息类型
enum Message {
Text(String),
Number(i32),
Quit,
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(Message::Text(String::from("你好"))).unwrap();
tx.send(Message::Number()).unwrap();
tx.send(Message::Quit).unwrap();
});
for msg in rx {
match msg {
Message::Text(text) => println!("文本:{}", text),
Message::Number(num) => println!("数字:{}", num),
Message::Quit => {
println!("收到退出信号");
break;
}
}
}
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(String::from("你好")).unwrap();
// tx 在这里被 drop 了
});
// 等待线程完成
thread::sleep(std::time::Duration::from_millis());
// ❌ 错误!tx 已经不存在了
// tx.send(String::from("第二条")).unwrap();
// 接收端会收到 Err,因为所有发送端都 drop 了
match rx.recv() {
Ok(msg) => println!("收到:{}", msg),
Err(_) => println!("发送端已关闭"),
}
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send("来自线程 1").unwrap();
});
// ❌ 错误!tx 被移动到上面的线程了
// thread::spawn(move || {
// tx.send("来自线程 2").unwrap();
// });
// ✅ 正确:再克隆一次
let tx2 = tx.clone();
thread::spawn(move || {
tx2.send("来自线程 2").unwrap();
});
for _ in .. {
println!("收到:{}", rx.recv().unwrap());
}
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 消费者先开始等待
thread::spawn(move || {
println!("等待消息...");
let msg = rx.recv().unwrap(); // 阻塞
println!("收到:{}", msg);
});
// 生产者...等等,生产者在另一个线程
// 而主线程在这里卡住了,因为没有生产者!
// ❌ 死锁!需要创建生产者线程
thread::spawn(move || {
tx.send("你好").unwrap();
});
thread::sleep(std::time::Duration::from_secs());
}
多个工作线程产生日志,一个专门的线程负责写入文件:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::fs::OpenOptions;
use std::io::Write;
enum LogMessage {
Info(String),
Warning(String),
Error(String),
}
fn main() {
let (tx, rx) = mpsc::channel();
// 日志写入线程(消费者)
let log_handle = thread::spawn(move || {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("app.log")
.unwrap();
for msg in rx {
let log_line = match msg {
LogMessage::Info(s) => format!("[INFO] {}\n", s),
LogMessage::Warning(s) => format!("[WARN] {}\n", s),
LogMessage::Error(s) => format!("[ERROR] {}\n", s),
};
writeln!(file, "{}", log_line).unwrap();
print!("{}", log_line); // 同时也输出到控制台
}
});
// 模拟多个工作线程
for i in ..= {
let tx_clone = tx.clone();
thread::spawn(move || {
for j in ..= {
tx_clone.send(LogMessage::Info(
format!("工作线程 {} - 任务 {}", i, j)
)).unwrap();
thread::sleep(Duration::from_millis());
}
});
}
// 等待所有工作完成
thread::sleep(Duration::from_secs());
// drop 所有发送端,日志线程会退出
drop(tx);
log_handle.join().unwrap();
}
Actor 模式是一种并发设计模式:每个 Actor 是一个独立的实体,有自己的状态,通过消息与其他 Actor 通信。
use std::sync::mpsc;
use std::thread;
use std::collections::HashMap;
// Actor 消息
enum ActorMessage {
Get(String),
Set(String, String),
Delete(String),
Stop,
}
// Actor 响应
enum ActorResponse {
Value(Option<String>),
Done,
}
// 简单的键值存储 Actor
fn kv_store_actor(rx: mpsc::Receiver<ActorMessage>) {
let mut store = HashMap::new();
loop {
match rx.recv() {
Ok(ActorMessage::Get(key)) => {
let response = store.get(&key).cloned();
println!("Get {:?} -> {:?}", key, response);
}
Ok(ActorMessage::Set(key, value)) => {
store.insert(key, value);
println!("Set 完成");
}
Ok(ActorMessage::Delete(key)) => {
store.remove(&key);
println!("Delete {:?} 完成", key);
}
Ok(ActorMessage::Stop) => {
println!("Actor 停止");
break;
}
Err(_) => break,
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
// 启动 Actor
let actor_handle = thread::spawn(move || {
kv_store_actor(rx);
});
// 与 Actor 交互
tx.send(ActorMessage::Set("name".to_string(), "Larry".to_string())).unwrap();
tx.send(ActorMessage::Set("age".to_string(), "25".to_string())).unwrap();
tx.send(ActorMessage::Get("name".to_string())).unwrap();
tx.send(ActorMessage::Delete("age".to_string())).unwrap();
tx.send(ActorMessage::Stop).unwrap();
actor_handle.join().unwrap();
}
输出:
Set 完成
Set 完成
Get "name" -> Some("Larry")
Delete "age" 完成
Actor 停止
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
enum Job {
Process(i32),
Shutdown,
}
fn worker(id: usize, rx: std::sync::Arc<std::sync::Mutex<mpsc::Receiver<Job>>>) {
loop {
// 需要 Mutex 因为多个线程共享同一个 receiver
let job = {
let rx = rx.lock().unwrap();
rx.recv().unwrap()
};
match job {
Job::Process(n) => {
println!("工人 {} 处理任务 {}", id, n);
thread::sleep(Duration::from_millis());
println!("工人 {} 完成任务 {}", id, n);
}
Job::Shutdown => {
println!("工人 {} 收到 shutdown", id);
break;
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
// 创建 3 个工作线程
let mut workers = vec![];
for i in .. {
let rx_clone = std::sync::Arc::clone(&rx);
let handle = thread::spawn(move || {
worker(i, rx_clone);
});
workers.push(handle);
}
// 发送任务
for i in .. {
tx.send(Job::Process(i)).unwrap();
}
// 发送 shutdown 信号
for _ in .. {
tx.send(Job::Shutdown).unwrap();
}
// 等待所有工人完成
for handle in workers {
handle.join().unwrap();
}
}

clone() 创建多个发送端。recv() 会阻塞,可以用 recv_timeout() 或 try_recv() 避免死锁。下篇预告: 如果就是需要共享状态怎么办?比如多个线程需要读写同一个计数器?别急,下篇我们学习 Mutex、RwLock、Arc,以及 Rust 如何通过类型系统保证共享状态的安全!