前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust并发控制之Condvar

Rust并发控制之Condvar

作者头像
newbmiao
发布2023-11-27 12:32:43
2550
发布2023-11-27 12:32:43
举报
文章被收录于专栏:学点Rust学点Rust

上次提到的 Barrier 用到了 Rust 的 condvar 和 mutex,今天来看下 condvar 的用法。

文章目录

  • 唤醒顺序不保证
  • 虚假唤醒

condvar 即 condition variable(条件变量),是一种线程同步的方式,用于线程间的通信。它可以阻塞(wait)线程,期间不消耗 CPU,直到某个时间发生唤醒(notify)线程。

代码举例来说:

代码语言:javascript
复制
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        // We notify the condvar that the value has changed.
        cvar.notify_one();
    });

    // Wait for the thread to start up.
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }
}

代码中,创建一个线程在修改 started 变量后唤醒等待的线程。main 中等待的的线程会一直阻塞(wait)直到 started 的值被修改。

其中 wait 会需要一个锁的 MutexGuard 来配合,wait 会自动释放锁,并阻塞当前线程,直到被唤醒时重新获取锁,并返回锁的 MutexGuard,来获取锁当前保护的值

Tips: MutexGuard 实现了销毁时自动释放锁和可以通过解引用(deref)到它保护的值

这里有两个有意思的点:

  • 为什么要和 mutex 一起使用?
  • 为什么唤醒时要检查条件是否满足?

这个要从 condvar 唤醒的机制说起。

唤醒顺序不保证

先来看下唤醒的顺序,我们起两批同样数目的线程,一批线程每个线程会修改一次变量并唤醒一个另一批等待的线程,为了观测唤醒顺序,代码如下:

代码语言:javascript
复制
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self};

struct SharedData {
    counter: Mutex<usize>,
    condvar: Condvar,
}

fn main() {
    let shared_data = Arc::new(SharedData {
        counter: Mutex::new(0),
        condvar: Condvar::new(),
    });
    let thread_num = 5;
    let mut workers = Vec::new();
    let mut waits = Vec::new();

    for i in 0..thread_num {
        do_wait(i, Arc::clone(&shared_data), &mut waits);
    }
    for i in 0..thread_num {
        do_work(i, Arc::clone(&shared_data), &mut workers)
    }
    waits.into_iter().for_each(|w| w.join().unwrap());
    workers.into_iter().for_each(|w| w.join().unwrap());
}

fn do_work(i: i32, data: Arc<SharedData>, workers: &mut Vec<thread::JoinHandle<()>>) {
    workers.push(thread::spawn(move || {
        let SharedData { counter, condvar } = &*data;
        let mut data = counter.lock().unwrap();
        *data += 1;
        println!("Woker thread {} before notify: Counter {}", i, data);
        condvar.notify_one();
    }));
}
fn do_wait(i: i32, data: Arc<SharedData>, waits: &mut Vec<thread::JoinHandle<()>>) {
    waits.push(thread::spawn(move || {
        let SharedData { counter, condvar } = &*data;
        let mut data = counter.lock().unwrap();
        data = condvar.wait(data).unwrap();
        println!("   Wait thread {} after wake up: Counter {}", i, data);
    }));
}

运行结果不唯一,比如如下结果,五次修改触发了五次唤醒,但是 wait 唤醒顺序不一定是按照 worker 修改顺序(而修改顺序是符合预期的,因为是加锁保证的):

代码语言:javascript
复制
Woker thread 0 before notify: Counter 1
Woker thread 4 before notify: Counter 2
Woker thread 2 before notify: Counter 3
   Wait thread 1 after wake up: Counter 3
   Wait thread 3 after wake up: Counter 3
Woker thread 3 before notify: Counter 4
   Wait thread 0 after wake up: Counter 4
Woker thread 1 before notify: Counter 5
   Wait thread 4 after wake up: Counter 5
   Wait thread 2 after wake up: Counter 5

甚至有可能是唤醒次数少于五次,导致有些线程一直阻塞,比如如下结果,只有四次唤醒,导致有 1 个线程一直阻塞:

代码语言:javascript
复制
Woker thread 1 before notify: Counter 1
   Wait thread 2 after wake up: Counter 1
Woker thread 3 before notify: Counter 2
Woker thread 0 before notify: Counter 3
   Wait thread 4 after wake up: Counter 3
Woker thread 4 before notify: Counter 4
   Wait thread 3 after wake up: Counter 4
   Wait thread 1 after wake up: Counter 4
Woker thread 2 before notify: Counter 5
# 有一个线程一直阻塞在这里

为什么顺序不保证呢?condvar 实现是基于操作系统的条件变量实现,顺序取决于操作系统调度时当前可唤醒的线程是哪个,要保证唤醒顺序需要额外的开销,而这个开销是不必要的,因为唤醒顺序对于线程间的通信是没有意义的,所以底层实现并不保证唤醒顺序。这里[1]有相关讨论

所以多个线程等待同一条件变量时,notify_one 唤醒和等待也不是一定是一对一的调用,每次唤醒也不能保证都是不同的等待线程。

至于为什么会有线程一直阻塞的情况,是因为唤醒次数少于等待次数,导致有些线程一直阻塞。 因为是多线程并发构建的 notify_one 和 wait,存在调用 notify_one 时没有线程在等待的可能,导致唤醒次数少于等待次数的情况。

虚假唤醒

还有就是虚假唤醒,即 wait 返回时,条件由于并发原因已经不满足,还可能因为唤醒并不是由于显示的 notify 调用,这个听起来很奇怪,但不是一个 bug,是底层操作系统实现导致的,具体看看wiki[2]上的说明吧。

综上这两点,condvar 唤醒时是需要重新检查条件是否依旧满足,而且需要和 mutex 一起使用,来确保条件值获取的并发安全。

除此 condvar 还有一些方便的方法,比如提供了

  • notify_all 来广播唤醒所有等待的线程;
  • wait_while 可以根据条件等待条件直到满足;
  • wait_timeout 只等待一段时间如果不能及时被唤醒。

官方文档都有例子,就不展开了。

关于 condvar 比较实际的例子有 WaitGroup,不需要像 Barrier 一样初始化时指定线程数量,而是在运行时动态增加线程数量,在crossbeam-utils[3]中有实现,代码很精炼,感兴趣可以看下

参考资料

[1]

这里: https://www.reddit.com/r/C_Programming/comments/12itrvd/condition_variables_wakeup_ordering/

[2]

wiki: https://en.wikipedia.org/wiki/Spurious_wakeup

[3]

crossbeam-utils: https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-utils/src/sync/wait_group.rs


推荐阅读

如果有用,点个 在看,让更多人看到

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-11-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 菜鸟Miao 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 唤醒顺序不保证
  • 虚假唤醒
    • 参考资料
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档