首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >30-Rust 教程 - 消息传递

30-Rust 教程 - 消息传递

作者头像
LarryLan
发布2026-06-03 18:20:26
发布2026-06-03 18:20:26
740
举报

消息传递

"别共享内存了,互相打电话吧!" —— Rust 并发哲学的核心

🎬 引入

还记得上篇我们说的厨房类比吗?多个厨师(线程)同时做菜,如果都去同一个冰箱(共享内存)拿食材,很容易撞车。

那怎么办呢?有两种方案:

方案 1:共享状态

  • 冰箱上挂个牌子:"我正在用,别人别动"
  • 每次拿东西前先挂牌子,用完再摘下来
  • 问题:万一有人忘了摘牌子呢?或者两个人同时挂牌子呢?

方案 2:消息传递

  • 每个厨师有自己的食材箱
  • 需要食材时,给仓库管理员发消息:"我要两个鸡蛋"
  • 管理员把鸡蛋送过来
  • 完美!没有争抢,只有通信

Rust 推崇方案 2——消息传递。这是 Go 语言的名言 "Don't communicate by sharing memory, share memory by communicating" 的 Rust 版本。

今天我们就来学习 Rust 的消息传递机制:channel(通道)、mpsc(多生产者单消费者)、以及如何实现 Actor 模式。

📌 核心概念

什么是消息传递?

消息传递的核心思想是:线程之间不直接共享数据,而是通过发送和接收消息来通信

生活化类比:

想象一个公司:

  • 生产者 = 员工(可以多个)
  • channel = 公司内部邮箱系统
  • 消费者 = 老板(通常一个,处理所有邮件)

员工把报告放进邮箱,老板从邮箱取报告。员工和老板不需要直接见面,也不需要共享同一个办公桌。

channel 是什么?

channel 是 Rust 标准库提供的消息传递机制。它就像一根管道:

代码语言:javascript
复制
[生产者] ---> [channel] ---> [消费者]
  • 生产者调用 send() 把数据放进管道
  • 消费者调用 recv() 从管道取出数据
  • 数据的所有权被转移,不是复制

mpsc 是什么?

mpsc 是 multiple producer, single consumer(多生产者单消费者)的缩写。

  • 多生产者:可以有多个线程发送消息
  • 单消费者:只有一个线程接收消息

为什么是单消费者?因为如果多个消费者同时从同一个 channel 收消息,可能会导致消息被重复处理或者丢失。

Rust 编译器: "多个消费者?那消息到底归谁?我不允许这种 ambiguity!"

💻 代码示例

基础示例:单生产者单消费者

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    // 1. 创建 channel
    // tx = transmitter (发送端)
    // rx = receiver (接收端)
    let (tx, rx) = mpsc::channel();
    
    // 2. 创建生产者线程
    thread::spawn(move || {
        let msg = String::from("你好,消费者!");
        tx.send(msg).unwrap();
        // msg 的所有权已经转移给 channel 了
    });
    
    // 3. 消费者接收消息
    // recv 会阻塞,直到有消息到达
    let received = rx.recv().unwrap();
    println!("收到:{}", received);
}

输出:

代码语言:javascript
复制
收到:你好,消费者!

关键点:

  • mpsc::channel() 返回一对端点:发送端 tx 和接收端 rx
  • send() 发送消息,返回 Result<(), SendError<T>>
  • recv() 接收消息,会阻塞直到有消息,返回 Result<T, RecvError>
  • 消息的所有权被转移,不是复制

多个消息

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let messages = vec![
            String::from("消息 1"),
            String::from("消息 2"),
            String::from("消息 3"),
        ];
        
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis());
        }
    });
    
    // 方式 1:阻塞接收
    // for received in rx {
    //     println!("收到:{}", received);
    // }
    
    // 方式 2:非阻塞接收
    loop {
        match rx.recv_timeout(Duration::from_millis()) {
            Ok(msg) => println!("收到:{}", msg),
            Err(_) => {
                println!("超时,没有新消息");
                break;
            }
        }
    }
}

多生产者

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 生产者 1
    let tx1 = tx.clone();  // 克隆发送端
    thread::spawn(move || {
        for i in ..= {
            tx1.send(format!("生产者 1: 消息 {}", i)).unwrap();
            thread::sleep(Duration::from_millis());
        }
    });
    
    // 生产者 2
    let tx2 = tx.clone();
    thread::spawn(move || {
        for i in ..= {
            tx2.send(format!("生产者 2: 消息 {}", i)).unwrap();
            thread::sleep(Duration::from_millis());
        }
    });
    
    // 原始的 tx 在这里会 drop,不影响其他克隆
    
    // 消费者接收所有消息
    for received in rx {
        println!("收到:{}", received);
    }
}

输出(顺序可能不同):

代码语言:javascript
复制
收到:生产者 1: 消息 1
收到:生产者 2: 消息 1
收到:生产者 1: 消息 2
收到:生产者 2: 消息 2
收到:生产者 1: 消息 3
收到:生产者 2: 消息 3

关键点:

  • tx.clone() 创建多个发送端
  • 所有发送端共享同一个 channel
  • 当所有发送端都 drop 后,recv() 会返回错误

不同类型的消息

channel 可以发送任何类型的数据,包括枚举:

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;

// 定义消息类型
enum Message {
    Text(String),
    Number(i32),
    Quit,
}

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        tx.send(Message::Text(String::from("你好"))).unwrap();
        tx.send(Message::Number()).unwrap();
        tx.send(Message::Quit).unwrap();
    });
    
    for msg in rx {
        match msg {
            Message::Text(text) => println!("文本:{}", text),
            Message::Number(num) => println!("数字:{}", num),
            Message::Quit => {
                println!("收到退出信号");
                break;
            }
        }
    }
}

🐛 常见坑点

坑点 1:发送端 drop 后继续发送

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        tx.send(String::from("你好")).unwrap();
        // tx 在这里被 drop 了
    });
    
    // 等待线程完成
    thread::sleep(std::time::Duration::from_millis());
    
    // ❌ 错误!tx 已经不存在了
    // tx.send(String::from("第二条")).unwrap();
    
    // 接收端会收到 Err,因为所有发送端都 drop 了
    match rx.recv() {
        Ok(msg) => println!("收到:{}", msg),
        Err(_) => println!("发送端已关闭"),
    }
}

坑点 2:忘记克隆发送端

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    let tx1 = tx.clone();
    thread::spawn(move || {
        tx1.send("来自线程 1").unwrap();
    });
    
    // ❌ 错误!tx 被移动到上面的线程了
    // thread::spawn(move || {
    //     tx.send("来自线程 2").unwrap();
    // });
    
    // ✅ 正确:再克隆一次
    let tx2 = tx.clone();
    thread::spawn(move || {
        tx2.send("来自线程 2").unwrap();
    });
    
    for _ in .. {
        println!("收到:{}", rx.recv().unwrap());
    }
}

坑点 3:recv 阻塞导致死锁

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 消费者先开始等待
    thread::spawn(move || {
        println!("等待消息...");
        let msg = rx.recv().unwrap();  // 阻塞
        println!("收到:{}", msg);
    });
    
    // 生产者...等等,生产者在另一个线程
    // 而主线程在这里卡住了,因为没有生产者!
    
    // ❌ 死锁!需要创建生产者线程
    thread::spawn(move || {
        tx.send("你好").unwrap();
    });
    
    thread::sleep(std::time::Duration::from_secs());
}

🎯 实战案例

案例 1:日志收集器

多个工作线程产生日志,一个专门的线程负责写入文件:

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::fs::OpenOptions;
use std::io::Write;

enum LogMessage {
    Info(String),
    Warning(String),
    Error(String),
}

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 日志写入线程(消费者)
    let log_handle = thread::spawn(move || {
        let mut file = OpenOptions::new()
            .create(true)
            .append(true)
            .open("app.log")
            .unwrap();
        
        for msg in rx {
            let log_line = match msg {
                LogMessage::Info(s) => format!("[INFO] {}\n", s),
                LogMessage::Warning(s) => format!("[WARN] {}\n", s),
                LogMessage::Error(s) => format!("[ERROR] {}\n", s),
            };
            
            writeln!(file, "{}", log_line).unwrap();
            print!("{}", log_line);  // 同时也输出到控制台
        }
    });
    
    // 模拟多个工作线程
    for i in ..= {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            for j in ..= {
                tx_clone.send(LogMessage::Info(
                    format!("工作线程 {} - 任务 {}", i, j)
                )).unwrap();
                thread::sleep(Duration::from_millis());
            }
        });
    }
    
    // 等待所有工作完成
    thread::sleep(Duration::from_secs());
    
    // drop 所有发送端,日志线程会退出
    drop(tx);
    log_handle.join().unwrap();
}

案例 2:Actor 模式

Actor 模式是一种并发设计模式:每个 Actor 是一个独立的实体,有自己的状态,通过消息与其他 Actor 通信。

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;
use std::collections::HashMap;

// Actor 消息
enum ActorMessage {
    Get(String),
    Set(String, String),
    Delete(String),
    Stop,
}

// Actor 响应
enum ActorResponse {
    Value(Option<String>),
    Done,
}

// 简单的键值存储 Actor
fn kv_store_actor(rx: mpsc::Receiver<ActorMessage>) {
    let mut store = HashMap::new();
    
    loop {
        match rx.recv() {
            Ok(ActorMessage::Get(key)) => {
                let response = store.get(&key).cloned();
                println!("Get {:?} -> {:?}", key, response);
            }
            Ok(ActorMessage::Set(key, value)) => {
                store.insert(key, value);
                println!("Set 完成");
            }
            Ok(ActorMessage::Delete(key)) => {
                store.remove(&key);
                println!("Delete {:?} 完成", key);
            }
            Ok(ActorMessage::Stop) => {
                println!("Actor 停止");
                break;
            }
            Err(_) => break,
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 启动 Actor
    let actor_handle = thread::spawn(move || {
        kv_store_actor(rx);
    });
    
    // 与 Actor 交互
    tx.send(ActorMessage::Set("name".to_string(), "Larry".to_string())).unwrap();
    tx.send(ActorMessage::Set("age".to_string(), "25".to_string())).unwrap();
    tx.send(ActorMessage::Get("name".to_string())).unwrap();
    tx.send(ActorMessage::Delete("age".to_string())).unwrap();
    tx.send(ActorMessage::Stop).unwrap();
    
    actor_handle.join().unwrap();
}

输出:

代码语言:javascript
复制
Set 完成
Set 完成
Get "name" -> Some("Larry")
Delete "age" 完成
Actor 停止

案例 3:工作池模式

代码语言:javascript
复制
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

enum Job {
    Process(i32),
    Shutdown,
}

fn worker(id: usize, rx: std::sync::Arc<std::sync::Mutex<mpsc::Receiver<Job>>>) {
    loop {
        // 需要 Mutex 因为多个线程共享同一个 receiver
        let job = {
            let rx = rx.lock().unwrap();
            rx.recv().unwrap()
        };
        
        match job {
            Job::Process(n) => {
                println!("工人 {} 处理任务 {}", id, n);
                thread::sleep(Duration::from_millis());
                println!("工人 {} 完成任务 {}", id, n);
            }
            Job::Shutdown => {
                println!("工人 {} 收到 shutdown", id);
                break;
            }
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
    
    // 创建 3 个工作线程
    let mut workers = vec![];
    for i in .. {
        let rx_clone = std::sync::Arc::clone(&rx);
        let handle = thread::spawn(move || {
            worker(i, rx_clone);
        });
        workers.push(handle);
    }
    
    // 发送任务
    for i in .. {
        tx.send(Job::Process(i)).unwrap();
    }
    
    // 发送 shutdown 信号
    for _ in .. {
        tx.send(Job::Shutdown).unwrap();
    }
    
    // 等待所有工人完成
    for handle in workers {
        handle.join().unwrap();
    }
}

🧠 思维导图

30-消息传递
30-消息传递

📝 小结

  1. 消息传递优于共享状态——避免数据竞争,代码更清晰。
  2. mpsc channel 支持多生产者单消费者,用 clone() 创建多个发送端。
  3. recv() 会阻塞,可以用 recv_timeout()try_recv() 避免死锁。
  4. 消息所有权被转移,不是复制,高效且安全。
  5. Actor 模式 是消息传递的经典应用,每个 Actor 维护自己的状态。

下篇预告: 如果就是需要共享状态怎么办?比如多个线程需要读写同一个计数器?别急,下篇我们学习 Mutex、RwLock、Arc,以及 Rust 如何通过类型系统保证共享状态的安全!

🔗 参考资料

  • Rust Book - mpsc
  • std::sync::mpsc 文档
  • Actor 模式
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Larry的Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息传递
    • 🎬 引入
    • 📌 核心概念
      • 什么是消息传递?
      • channel 是什么?
      • mpsc 是什么?
    • 💻 代码示例
      • 基础示例:单生产者单消费者
      • 多个消息
      • 多生产者
      • 不同类型的消息
    • 🐛 常见坑点
      • 坑点 1:发送端 drop 后继续发送
      • 坑点 2:忘记克隆发送端
      • 坑点 3:recv 阻塞导致死锁
    • 🎯 实战案例
      • 案例 1:日志收集器
      • 案例 2:Actor 模式
      • 案例 3:工作池模式
    • 🧠 思维导图
    • 📝 小结
    • 🔗 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档