前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust并发控制之Channel

Rust并发控制之Channel

作者头像
newbmiao
发布2023-12-13 15:21:49
2000
发布2023-12-13 15:21:49
举报
文章被收录于专栏:学点Rust学点Rust

Rust 官方sync包中提供了mpsc模式的 (多生产者,单消费者:multi-producer, single-consumer) channel,可以实现基于消息并发控制,而不是依赖控制内存共享(加锁)。这正是 go 语言作者 R. Pike 所推崇的方式:

Don't communicate by sharing memory; share memory by communicating. (R. Pike)

今天就聊聊mpsc提供的sync_channelchannel

文章目录

  • 规则
  • sync_channel - spsc
  • sync_channel - mpsc
  • channel
  • 并发安全

规则

首先一般 channel 机制都保证了

  • 生产者(producer/sender) 可以发送(send)消息,消费者(consumer/receiver)可以接受(recv)消息,生产和消费的顺序一致(一般都有消息队列保证顺序FIFO
  • 消费者在没有消息可接收前会阻塞等待,直到有消息或 channel 关闭
  • channel 可以限制同时可处理消息上限(buffer size)
  • 生产者发送的消息累积到 buffer 上限时就要阻塞到有消息被消费

从这些规则中,可以看出,channel 保证了生产总是先于消费,消息处理总是先进先出(FIFO)。

sync_channel - spsc

buffer size 最特别的情况就是 0,就是单生产者单消费者模式(mpsc):send 后会阻塞,直到有 recv 处理,才能再 send 下一个消息。

这就能很好的实现对并发顺序的控制, 比如下边代码,用两组 channel 实现 1 和 2 的交替打印。

不同 channel 的 send 和 recv 交叉等待,保证了打印的顺序,就像这中间持有锁一样

代码语言:javascript
复制
use std::sync::mpsc::sync_channel;
use std::thread;

fn main() {
    let (sender, receiver) = sync_channel(0);
    let (sender2, receiver2) = sync_channel(0);

    let cnt = 3;
    let t1 = thread::spawn({
        move || {
            for _ in 0..cnt {
                print!("1 ");
                // t1打印完,通知t2的receiver打印
                sender.send(2).unwrap();
                // 阻塞,等待t2打印结束
                receiver2.recv().unwrap();
            }
        }
    });

    let t2 = thread::spawn({
        move || {
            for _ in 0..cnt {
                // 阻塞,等待t1 sender的已打印的消息
                receiver.recv().unwrap();
                print!("2 ");
                // t2打印完, 给t1 receiver2通知可以进行下一次打印
                sender2.send(1).unwrap();
            }
        }
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

sync_channel - mpsc

buffer size 增加,就是正常mpsc摸式,可以控制同时能并发的上限(实际内部提前分配了数组来支持 buffer)。

达到上限,sender 就需要等待有 receiver 消费才能够继续发送消息。

当然没消息的话,别忘了 drop 也是可以结束 recv 一直等待消息的。

如下边代码所示:

代码语言:javascript
复制
use std::sync::mpsc::sync_channel;
use std::thread;

fn main() {
    let (sender, receiver) = sync_channel(3);

    let sender2 = sender.clone();
    let sender3 = sender.clone();
    thread::spawn(move || sender.send(1).unwrap());
    thread::spawn(move || sender2.clone().send(2).unwrap());

    drop(sender3); // 这里保证了第三个recv打印能成功
    println!("{:?}", receiver.recv().unwrap());
    println!("{:?}", receiver.recv().unwrap());
    println!("{:?}", receiver.recv());
}

channel

明白了sync_channelchannel就简单了,就是 buffer size 无限模式(实际是内部维护了一个链表自动扩容)。所有的 send 都不会阻塞,只有 recv 在没消息时需要阻塞等待 channel 中产生新的消息。

代码语言:javascript
复制
use std::sync::mpsc::{channel, sync_channel};
use std::thread;

fn main() {
    // let (sender, receiver) = sync_channel(1); // buffer为1的话,不会打印send no block
    let (sender, receiver) = channel(); // 使用channel,send不阻塞,会打印

    thread::spawn(move || {
        sender.send(1).unwrap();
        sender.send(2).unwrap();
        sender.send(3).unwrap();
        println!("send no block");
    });

    println!("{:?}", receiver.recv().unwrap());
}

如果想及时 check 是否能 recv 消息时,可以用try_recv

  • TryRecvError::Empty代表目前为空,但 channel 连接还在
  • TryRecvError::Disconnected则是连接已关闭,不可能再受到消息了
代码语言:javascript
复制
use std::sync::mpsc::{channel, Receiver, RecvError, TryRecvError};

fn main() {
    let (sender, receiver) = channel();
    fn try_recv_with_log(receiver: &Receiver<i32>) {
        match receiver.try_recv() {
            Ok(v) => println!("{:?}", v),
            Err(TryRecvError::Empty) => println!("error: Empty"),
            Err(TryRecvError::Disconnected) => println!("error: Disconnected"),
        }
    }
    // error: Empty
    try_recv_with_log(&receiver);
    sender.send(1).unwrap();
    receiver.recv().unwrap();
    drop(sender);
    // error: Disconnected
    try_recv_with_log(&receiver);
}

并发安全

代码语言:javascript
复制
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}

unsafe impl<T: Send> Send for Receiver<T> {}
impl<T> !Sync for Receiver<T> {}

最后来看看 rust 如何保证 channel 的并发安全

Sender<T>同时支持SendSync,其维护的消息队列可以安全的在线程间传递所有权,也可以了共享引用,即可以被多个线程同时进行 send 操作。

其中T需要实现Send, 以确保消息可以在线程间安全传递所有权,避免竞争条件或使用已释放的内存

Receiver<T>只支持 Send,只能在线程间传递自身所有权,但不能在线程间共享引用。同时只能有一个线程拥有其所有权,进而独占的去消费Sender<T>的消息队列。

依旧是巧妙的通过SendSync标记 trait 保证了并发的安全,轻松实现无畏并发。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-12-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 菜鸟Miao 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 规则
  • sync_channel - spsc
  • sync_channel - mpsc
  • channel
  • 并发安全
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档