首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【Linux系统】线程同步

【Linux系统】线程同步

作者头像
Ronin305
发布2025-12-22 13:29:20
发布2025-12-22 13:29:20
230
举报
文章被收录于专栏:我的博客我的博客

在上一章节中,我们使用互斥量之后,确实解决了数据竞争问题,但出现了新的问题:只有一个线程(thread 1)在处理所有售票任务。这展示了互斥量的一个局限性:它确保了线程安全,但不保证公平性。

1. 条件变量

根据这个问题,我们可以引入条件变量(Condition Variable) 。条件变量允许线程在特定条件不满足时主动等待,而不是忙等待或不公平地竞争锁。

为什么会出现只有一个线程工作的情况?

在输出中,只有thread 1在处理所有售票,这是因为:

  1. 锁竞争的不公平性:当一个线程释放锁后,它可能立即又重新获取锁,而其他线程没有机会获取
  2. 调度策略:操作系统的线程调度可能优先调度刚刚释放锁的线程
  3. 没有等待机制:线程在无法获取票时没有等待,而是继续竞争锁

1.1 条件变量的基本概念

条件变量是一种同步机制,允许线程在某个条件不满足时挂起等待,直到其他线程改变条件并通知它。

条件变量的主要操作:
  1. 等待pthread_cond_wait(cond, mutex)
    • 原子性地释放互斥锁并进入等待状态
    • 被唤醒后重新获取互斥锁
  2. 信号pthread_cond_signal(cond)
    • 唤醒一个等待该条件变量的线程
  3. 广播pthread_cond_broadcast(cond)
    • 唤醒所有等待该条件变量的线程

1.2 条件变量函数

初始化条件变量
代码语言:javascript
复制
int pthread_cond_init(pthread_cond_t *restrict cond, 
                     const pthread_condattr_t *restrict attr);

参数说明

  • cond:指向要初始化的条件变量的指针
  • attr:条件变量属性,通常为NULL表示使用默认属性

返回值:成功返回0,失败返回错误码

使用方式

代码语言:javascript
复制
// 动态初始化
pthread_cond_t cond;
if (pthread_cond_init(&cond, NULL) != 0) {
    // 处理错误
    perror("Failed to initialize condition variable");
    exit(EXIT_FAILURE);
}

// 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁条件变量
代码语言:javascript
复制
int pthread_cond_destroy(pthread_cond_t *cond);

参数说明

  • cond:要销毁的条件变量

返回值:成功返回0,失败返回错误码

注意事项

  • 只有在没有线程等待该条件变量时才能安全销毁
  • 静态初始化的条件变量不需要销毁
  • 销毁后不应再使用该条件变量
等待条件满足
代码语言:javascript
复制
int pthread_cond_wait(pthread_cond_t *restrict cond,
                     pthread_mutex_t *restrict mutex);

参数说明

  • cond:要等待的条件变量
  • mutex:与条件变量关联的互斥锁

返回值:成功返回0,失败返回错误码

关键特性

  1. 原子操作pthread_cond_wait 会原子性地执行以下操作:
    • 释放互斥锁 mutex
    • 将线程添加到条件变量的等待队列中
    • 使线程进入等待状态
  2. 唤醒后的操作:当线程被唤醒时,它会:
    • 重新获取互斥锁 mutex
    • 从 pthread_cond_wait 返回
  3. 虚假唤醒:线程可能会在没有收到明确信号的情况下被唤醒,因此必须在循环中检查条件
唤醒等待的线程
唤醒单个线程
代码语言:javascript
复制
int pthread_cond_signal(pthread_cond_t *cond);

功能:唤醒至少一个等待该条件变量的线程(具体唤醒哪个线程取决于调度策略)

使用场景:当只有一个线程需要被唤醒时使用,效率较高

唤醒所有线程
代码语言:javascript
复制
int pthread_cond_broadcast(pthread_cond_t *cond);

功能:唤醒所有等待该条件变量的线程

使用场景

  • 当多个线程需要被唤醒时
  • 当不确定哪个线程应该被唤醒时
  • 当条件的变化可能影响多个等待线程时
示例:
代码语言:javascript
复制
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>

#define NUM 5

int cnt = 100;

pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;

void* threadrun(void* args)
{
    std::string name = static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&glock); // 获取锁
        pthread_cond_wait(&gcond, &glock); // 等待条件变量(会自动释放锁!)

        std::cout << name << "计算: " << cnt++ << std::endl; // 被唤醒后执行任务
        pthread_mutex_unlock(&glock); // 释放锁
    }
}

int main()
{
    std::vector<pthread_t> threads;
    for(int i = 0; i < NUM; i++)
    {
        pthread_t tid;
        char* name = new char[64];
        snprintf(name, 64, "thread-%d", i+1);
        int n = pthread_create(&tid, nullptr, threadrun, name);
        if(n != 0) continue;
        threads.push_back(tid);
        sleep(1);
    }
    sleep(3);

    while(true)
    {
        std::cout << "唤醒一个线程..." << std::endl;
        pthread_cond_signal(&gcond);
        sleep(1);
    }

    for(auto& id : threads)
    {
        int n = pthread_join(id, nullptr);
        (void)n;// 返回值不做判断,基本都不会失败
    }
    return 0;
}

1. 初始化阶段

  • 静态初始化了一个互斥锁 glock 和一个条件变量 gcond

2. 线程创建阶段

  • 创建5个线程,每个线程间隔1秒启动
  • 每个线程执行 threadrun 函数,并传递线程名称作为参数

3. 线程执行阶段

这是最关键的部分,每个线程的执行流程:

  1. 获取互斥锁 (pthread_mutex_lock)
  2. 调用 pthread_cond_wait - 这个函数会:
    • 原子性地释放互斥锁(让其他线程可以获取锁)
    • 使线程进入等待状态(休眠,不消耗CPU)
    • 等待被 pthread_cond_signal 唤醒
  3. 被唤醒后,自动重新获取互斥锁,然后执行任务
  4. 释放互斥锁,然后循环回到步骤1

4. 主线程唤醒阶段

  • 主线程每隔1秒调用 pthread_cond_signal
  • 每次调用会唤醒一个等待在条件变量上的线程
条件变量函数详解

1. pthread_cond_wait(&gcond, &glock)

这是条件变量的核心函数,它的工作原理很精妙:

原子性操作

  1. 释放互斥锁 glock(让其他线程可以获取锁)
  2. 将当前线程加入到 gcond 的等待队列中
  3. 使线程进入等待状态(休眠)

当被唤醒时

  1. 重新获取互斥锁 glock(可能会阻塞,直到获取到锁)
  2. 从 pthread_cond_wait 返回,继续执行后续代码

2. pthread_cond_signal(&gcond)

  • 唤醒一个等待在条件变量 gcond 上的线程
  • 如果有多个线程在等待,具体唤醒哪个取决于调度策略
  • 不会立即让被唤醒的线程运行,只是将其从等待状态变为可运行状态

3. 为什么需要互斥锁配合?

条件变量必须与互斥锁配合使用,因为:

  1. 保护共享数据cnt++ 操作需要互斥保护
  2. 避免竞态条件:确保检查条件和进入等待是原子操作
  3. 防止虚假唤醒:在重新检查条件前保持锁的保护

运行结果:

代码语言:javascript
复制
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/TestCond$ ./test
唤醒一个线程...
thread-1计算: 100
唤醒一个线程...
thread-2计算: 101
唤醒一个线程...
thread-3计算: 102
唤醒一个线程...
thread-4计算: 103
唤醒一个线程...
thread-5计算: 104
唤醒一个线程...
thread-1计算: 105
唤醒一个线程...
thread-2计算: 106
唤醒一个线程...
thread-3计算: 107
唤醒一个线程...
thread-4计算: 108
唤醒一个线程...
thread-5计算: 109
唤醒一个线程...
thread-1计算: 110
唤醒一个线程...
thread-2计算: 111
^C

可以看到按序输出,这是因为:

  1. 线程按创建顺序依次进入等待状态
  2. pthread_cond_signal 通常按队列顺序唤醒线程(FIFO)
  3. 每次只唤醒一个线程,所以执行顺序是确定性的

2. 生产者消费者模型

2.1 超市购物比喻:理解生产者消费者模型

让我们用一个超市购物的生动例子来解释生产者消费者模型:

想象一个超市系统:

  • 生产者 = 商品供应商(如牛奶厂、面包房)
  • 消费者 = 购物顾客
  • 交易场所 = 超市货架和仓库
  • 商品 = 数据
工作流程详解
正常运营流程(缓冲区平衡状态)
  1. 供应商送货 → 商品放入货架 → 顾客购买 → 商品从货架取出
  2. 生产速度 ≈ 消费速度 → 系统平稳运行
两种阻塞场景详解

1. 当货架满时:生产者阻塞

现实场景

  • 送货卡车到达超市仓库
  • 仓库管理员:"对不起,仓库满了,请在门外稍等"
  • 卡车司机停车等待,不消耗燃油(不占用CPU)
  • 当有顾客买走商品,空出位置后:"卡车先生,现在可以卸货了!"
  • 卡车开始卸货

2. 当货架空时:消费者阻塞

现实场景

  • 顾客来到超市货架前
  • 货架空空如也:"唉,没货了,等等吧"
  • 顾客去喝咖啡休息,不浪费时间徘徊(不忙等待)
  • 当供应商补货后:"顾客们,新货到了!"
  • 顾客开始选购商品
🎯 三种关键关系

1. 生产者与生产者之间的关系:竞争关系

  • 超市例子:多个牛奶供应商都想把产品放到有限的冷藏柜中
  • 技术实现:需要互斥锁保护共享资源(货架空间)
  • 关系本质互斥 - 生产者之间需要竞争有限的缓冲区空间

2. 消费者与消费者之间的关系:竞争关系

  • 超市例子:多个顾客都想购买最后一瓶牛奶
  • 技术实现:需要互斥锁保护共享资源(商品)
  • 关系本质互斥 - 消费者之间需要竞争有限的数据/商品

3. 生产者与消费者之间的关系:同步与协作关系

  • 超市例子:顾客买走商品后,需要通知供应商补货;货架满时,供应商需要等待空位
  • 技术实现:使用条件变量进行线程间通信和同步
  • 关系本质同步 - 生产者和消费者需要协调工作节奏
👥 两种角色

1. 生产者

  • 职责:产生数据/商品并放入缓冲区
  • 特点:通常有固定的生产节奏
  • 关注点:缓冲区是否有空位

2. 消费者

  • 职责:从缓冲区取出数据/商品并进行处理
  • 特点:消费速度可能波动
  • 关注点:缓冲区是否有数据可消费
🏪 一个交易场所:缓冲区
  • 本质:一块特定结构的内存空间(通常是队列)
  • 功能
    • 解耦生产者和消费者
    • 平衡生产和消费速度差异
    • 提供临时存储

2.2 为何使用生产者消费者模型?

1. 解耦(Decoupling)

超市例子:牛奶厂不需要知道谁买了牛奶,顾客也不需要知道牛奶是哪家厂生产的。他们只关心超市这个中间平台。

技术优势

  • 生产者和消费者可以独立开发和修改
  • 系统更容易维护和扩展
  • 降低系统复杂度

2. 支持并发(Concurrency Support)

超市例子:多个供应商可以同时往不同区域补货,多个顾客可以同时在不同区域购物。

技术优势

  • 生产者线程和消费者线程可以并发执行
  • 提高系统吞吐量和资源利用率
  • 充分利用多核CPU性能

3. 支持忙闲不均(Handling Speed Mismatches)

超市例子:牛奶厂每天生产1000瓶奶,但顾客有时买得多有时买得少。超市仓库可以缓冲这种不平衡。

技术优势

  • 缓冲区可以平衡生产和消费的速度差异
  • 防止快速生产者淹没慢速消费者
  • 防止消费者等待造成资源浪费

2.3 基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中,阻塞队列(Blocking Queue)是一种线程安全的、常用于实现生产者-消费者模型的高级数据结构。与普通队列相比,阻塞队列具有以下关键特性:

  1. 阻塞特性:
  • 当队列为空时:消费者线程尝试从队列中获取元素会被阻塞,直到队列中有新元素
  • 当队列已满时:生产者线程尝试向队列中添加元素会被阻塞,直到队列中有空位
阻塞队列的实现原理

阻塞队列通常使用以下组件实现:

  1. 一个普通队列:存储元素的数据结构(数组或链表)
  2. 一个互斥锁:保护对队列的并发访问
  3. 两个条件变量
    • not_empty:当队列为空时,消费者线程等待此条件
    • not_full:当队列已满时,生产者线程等待此条件
阻塞队列的工作流程

生产者线程的工作流程

  1. 获取互斥锁
  2. 检查队列是否已满
    • 如果已满,等待not_full条件变量
    • 否则,将元素加入队列
  3. 释放互斥锁
  4. 通知消费者线程(通过not_empty条件变量)

消费者线程的工作流程

  1. 获取互斥锁
  2. 检查队列是否为空
    • 如果为空,等待not_empty条件变量
    • 否则,从队列取出元素
  3. 释放互斥锁
  4. 通知生产者线程(通过not_full条件变量)

模拟阻塞队列的生产消费模型

注意:

为便于理解,我们先以单生产者-单消费者模型为例进行讲解。初始阶段采用原生接口实现,后面再将我们之前封装好的互斥量等进行复用,

首先实现单生产-单消费模型,之后扩展为多生产-多消费模式(其实代码逻辑仍然保持不变)。

封装阻塞队列

上文已经提到了阻塞队列的原理,那么我们可以通过数据结构队列来实现,代码如下:

代码语言:javascript
复制
#include <iostream>
#include <queue>
#include <pthread.h>


const int  defaultcap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        :_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full_cond, nullptr);
        pthread_cond_init(&_empty_cond, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full_cond);
        pthread_cond_destroy(&_empty_cond);
    }
private:
    std::queue<T> _q;
    size_t _cap; // 队列容量大小

    pthread_mutex_t _mutex;
    pthread_cond_t _full_cond;
    pthread_cond_t _empty_cond;
};

生产者生产资源,消费者消费资源,本质都是对队列的增删查改等操作,也就是访问临界资源,所以互斥量是需要的,由于阻塞特性,所以条件变量也是必不可少的。

我们知道,队列为空时,消费者从队列获取数据会被阻塞,队列为满时,生产者生产数据入队列时也会被阻塞,那么我们就需要,判断队列的状态是否为空,还是满。当然这两个函数我们只需要在内部判断,不需要暴露给外部使用,可以私有

代码语言:javascript
复制
private:
    bool IsFull() { return _q.size() >= _cap; }
    bool IsEmpty() { return _q.empty(); }

生产者删除数据入队,在上文中我们已经知道了流程,不过我们在实现时引入了两个成员变量

代码语言:javascript
复制
private:
    std::queue<T> _q;
    size_t _cap; // 队列容量大小

    pthread_mutex_t _mutex;
    pthread_cond_t _full_cond;
    pthread_cond_t _empty_cond;

    int _csleep_num; // 消费者休眠的个数
    int _psleep_num; // 生产者休眠的个数
};

通过这两个成员变量判断是否有生产者或者消费者在wait阻塞休眠(队列满了或者空了),有的话我们就唤醒生产者或者消费者 

代码如下:

代码语言:javascript
复制
    // 生产者生产数据入队列
    void Enqueue(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        // 不能使用if判断,会虚假唤醒
        while(IsFull()) 
        {
            _psleep_num++;
            std::cout << "队列已满, 生产者进入休眠, 生产者休眠个数: " << _psleep_num << std::endl;
            pthread_cond_wait(&_full_cond, &_mutex);
            _psleep_num--;
        }

        // 此时队列必定有空间
        _q.push(in);

        // 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空
        // 那么就判断是否有消费者休眠,有就唤醒
        if(_csleep_num > 0)
        {
            pthread_cond_signal(&_empty_cond);
            std::cout << "唤醒消费者..." << std::endl;
        }

        // 直接唤醒其实也可以,为什么?
        //pthread_cond_signal(&_empty_cond);
        pthread_mutex_unlock(&_mutex);
        // 最后直接唤醒也行,为什么?
        //pthread_cond_signal(&_empty_cond);
    }

代码实现很简单,但是需要注意几个问题:

如果不使用这两个新增的条件变量,直接唤醒也行,或者在解锁之后直接唤醒也可以。为什么呢?

1. 为什么直接唤醒也可以?(即不管有没有消费者等待,都发送信号)

直接唤醒(无条件调用pthread_cond_signal)是可以的,但可能有性能影响

为什么可以?

  • pthread_cond_signal是一个轻量级操作
  • 如果没有线程在条件变量上等待,这个调用实际上什么都不做
  • 从功能上讲,不会造成任何错误

为什么不总是这样做?

  • 虽然单次调用开销很小,但在高并发场景下,大量不必要的信号调用会累积成可观的性能开销
  • 代码中的条件判断(if(_csleep_num > 0))是一种优化,避免了不必要的系统调用

注意:在使用条件变量阻塞等待时,会释放锁,唤醒之后会重新申请锁,但是此时也有可能锁被别人申请了,那么这个时候在申请锁时被阻塞等待。

2. 为什么在解锁后发送信号也可以?

在解锁后发送信号是完全可行且有时是更好的做法

为什么可以?

  • POSIX允许在持有或不持有互斥锁的情况下调用pthread_cond_signal
  • 条件变量的信号操作本身是线程安全的

为什么有时更好?

  1. 减少锁持有时间:先解锁再发信号,减少了互斥锁的持有时间
  2. 避免立即竞争:如果先发信号再解锁,被唤醒的线程会立即尝试获取锁,导致锁竞争
  3. 提高性能:被唤醒的线程可以立即获取到CPU时间片,而不是等待当前线程释放锁

潜在风险:

  • 如果在解锁后发送信号,需要确保状态的一致性不会被破坏
  • 在我们的例子中,由于队列操作已经完成,解锁后发送信号是安全的

3. 为什么使用 if 会造成虚假唤醒问题

问题:

pthread_cond_wait是函数调用,那么函数就有可能调用失败,万一失败,那此时队列为满并没有进行等待阻塞,而是直接push,把数据入队列,那不就出问题了吗?或者如果是多生产单消费,消费者消费完一个数据,然后广播唤醒了所有生产者,那所有生产者都会push数据,不也会出问题吗?

首先什么是虚假唤醒?

虚假唤醒是指线程在没有收到明确的信号或广播的情况下,从 pthread_cond_wait 中返回的现象。这不是 bug,而是 POSIX 标准允许的行为,原因包括:

  1. 性能优化:某些实现可能为了性能而允许虚假唤醒
  2. 信号中断:线程可能被系统信号中断
  3. 硬件因素:多处理器环境下的内存一致性模型

但是如果使用while循环判断,就不会出现这些问题,而是会重新检查 IsFull(),发现队列又满了,会再次进入等待。

消费者消费数据出队列,逻辑和生产者生产数据入队列一样,代码如下:

代码语言:javascript
复制
    // 消费者消费数据出队列
    T Pop()
    {
        pthread_mutex_lock(&_mutex);
        while(IsEmpty()) 
        {
            _csleep_num++;
            std::cout << "队列为空, 消费者进入休眠, 消费者休眠个数: " << _csleep_num << std::endl;
            pthread_cond_wait(&_empty_cond, &_mutex);
            _csleep_num--;
        }

        // 此时队列必定有空间
        T data = _q.front();
        _q.pop();

        // 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空
        // 那么就判断是否有消费者休眠,有就唤醒
        if(_psleep_num > 0)
        {
            pthread_cond_signal(&_full_cond);
            std::cout << "唤醒生产者..." << std::endl;
        }

        pthread_mutex_unlock(&_mutex);
        return data;
    }
主程序

阻塞队列已经封装好了,接下来就需要在主程序中编写,测试单生产单消费模型

代码语言:javascript
复制
#include "BlockQueue.hpp"
#include <unistd.h>

void* consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data = bq->Pop();
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void* producer(void* args)
{
    int data = 1;
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        sleep(1);
        std::cout << "生产了一个数据: " << data << std::endl;
        bq->Enqueue(data);
        data++;
    }
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>; 
    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, producer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

运行结果:

代码语言:javascript
复制
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/PC$ ./pc
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 1
唤醒消费者...
消费了一个数据: 1
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 2
唤醒消费者...
消费了一个数据: 2
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 3
唤醒消费者...
消费了一个数据: 3
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 4
唤醒消费者...
消费了一个数据: 4
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 5
唤醒消费者...
消费了一个数据: 5
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 6
唤醒消费者...
消费了一个数据: 6
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 7
唤醒消费者...
消费了一个数据: 7
队列为空, 消费者进入休眠, 消费者休眠个数: 1
生产了一个数据: 8
唤醒消费者...
消费了一个数据: 8
队列为空, 消费者进入休眠, 消费者休眠个数: 1
^C

我们也可以来试一下队列为满的情况,其他代码不变,先让消费者sleep上10秒钟,让生产者把队列push满

代码语言:javascript
复制
void* consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        sleep(10);
        int data = bq->Pop();
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void* producer(void* args)
{
    int data = 1;
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        //sleep(1);
        std::cout << "生产了一个数据: " << data << std::endl;
        bq->Enqueue(data);
        data++;
    }
}

运行结果:

代码语言:javascript
复制
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/PC$ ./pc
生产了一个数据: 1
生产了一个数据: 2
生产了一个数据: 3
生产了一个数据: 4
生产了一个数据: 5
生产了一个数据: 6
队列已满, 生产者进入休眠, 生产者休眠个数: 1
唤醒生产者...
消费了一个数据: 1
生产了一个数据: 7
队列已满, 生产者进入休眠, 生产者休眠个数: 1
唤醒生产者...
消费了一个数据: 2
生产了一个数据: 8
队列已满, 生产者进入休眠, 生产者休眠个数: 1
^C

对于多生产多消费模型,我们的阻塞队列代码并不需要改变,其实原理都是一样的,因为不管是谁访问队列,都需要互斥访问。

注意:这里使用模板是为了说明队列中不仅可以存放内置类型(如int),对象同样可以作为任务参与生产消费流程。


3. 为什么 pthread_cond_wait 需要互斥量?

基本原理

条件等待是多线程编程中实现线程同步的重要手段。它的核心逻辑是:当一个线程发现某个条件不满足时,主动进入等待状态,直到其他线程修改了共享变量使得条件满足,并通过信号唤醒等待线程。这种机制必须满足以下两个基本要素:

  1. 共享变量的修改:必须有至少一个线程能够修改影响条件的共享变量
  2. 互斥保护:所有对共享变量的访问和修改都必须通过互斥锁进行保护

错误实现示例分析

考虑以下看似合理的错误实现:

代码语言:javascript
复制
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
    pthread_mutex_unlock(&mutex);
    //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
    pthread_cond_wait(&cond, &mutex);
    pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

这个设计存在严重的竞态条件问题:

  1. 在解锁后到调用pthread_cond_wait之前存在时间窗口
  2. 其他线程可能在此期间获取锁、修改条件并发送信号
  3. 这会导致信号丢失,等待线程可能永远阻塞

假设有两个线程:消费者线程C和生产者线程P

时间

消费者线程C

生产者线程P

问题描述

t1

pthread_mutex_lock(&mutex)

等待锁

C获取锁

t2

while (condition_is_false) → true

等待锁

条件不满足

t3

pthread_mutex_unlock(&mutex)

等待锁

C释放锁

t4

时间窗口开始

pthread_mutex_lock(&mutex)

P获取锁

t5

准备调用 pthread_cond_wait

修改条件为true

P改变条件

t6

pthread_cond_signal(&cond)

P发送信号

t7

pthread_mutex_unlock(&mutex)

P释放锁

t8

调用 pthread_cond_wait(&cond, &mutex)

信号已错过!

t9

永久阻塞...

线程死锁

正确的原子性操作

正确的实现要求解锁和等待必须是原子操作,这正是pthread_cond_wait的设计目的:

  1. 函数原型int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
  2. 内部机制
    • 检查条件量是否为0
    • 将互斥量置为1(解锁)
    • 进入等待状态
    • 被唤醒后,将条件量置为1
    • 恢复互斥量原状态

4. 封装条件变量

和封装互斥量一样非常简单,代码如下:

代码语言:javascript
复制
#pragma once
#include <cstring>
#include "Mutex.hpp"


using namespace MutexModule;

namespace CondModule
{
    class Cond
    {
    public:
        Cond()
        {
            int n = pthread_cond_init(&_cond, nullptr);
            if (n != 0)
            {
                std::cerr << "cond init failed: " << strerror(n) << std::endl;
            }
        }

        void Wait(Mutex& mutex)
        {
            int n = pthread_cond_wait(&_cond, mutex.Get());
            if (n != 0)
            {
                std::cerr << "cond wait failed: " << strerror(n) << std::endl;
            }
        }

        void Signal()
        {
            int n = pthread_cond_signal(&_cond);
            if (n != 0)
            {
                std::cerr << "cond signal failed: " << strerror(n) << std::endl;
            }
        }

        void Broadcast()
        {
            int n = pthread_cond_broadcast(&_cond);
            if (n != 0)
            {
                std::cerr << "cond broadcast failed: " << strerror(n) << std::endl;
            }
        }

        ~Cond()
        {
            int n = pthread_cond_destroy(&_cond);
            if (n != 0)
            {
                std::cerr << "cond destroy failed: " << strerror(n) << std::endl;
            }
        }
    private:
        pthread_cond_t _cond;
    };
}

为了提高条件变量的通用性,建议在封装Cond类时避免直接引用内部的互斥量。这样可以在后续组合使用时避免因代码耦合导致的初始化困难,因为Mutex和Cond通常需要同时创建。

我们给互斥量新增一个接口,用于条件变量中需要wait获得锁的情况:

代码语言:javascript
复制
        pthread_mutex_t* Get()
        {
            return &_mutex;
        }

下面我们也可以将阻塞队列修改一下,将封装的互斥量和条件变量复用起来

代码语言:javascript
复制
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"

using namespace MutexModule;
using namespace CondModule;

const int  defaultcap = 5;

template <class T>
class BlockQueue
{
private:
    bool IsFull() { return _q.size() >= _cap; }
    bool IsEmpty() { return _q.empty(); }
public:
    BlockQueue(int cap = defaultcap)
        :_cap(cap), _csleep_num(0), _psleep_num(0)
    {}

    // 生产者生产数据入队列
    void Enqueue(const T& in)
    {
        LockGuard lockguard(_mutex);
        // 不能使用if判断,会虚假唤醒
        while(IsFull()) 
        {
            _psleep_num++;
            std::cout << "队列已满, 生产者进入休眠, 生产者休眠个数: " << _psleep_num << std::endl;
            _full_cond.Wait(_mutex);
            _psleep_num--;
        }

        // 此时队列必定有空间
        _q.push(in);

        // 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空
        // 那么就判断是否有消费者休眠,有就唤醒
        if(_csleep_num > 0)
        {
            _empty_cond.Signal();
            std::cout << "唤醒消费者..." << std::endl;
        }

    }

    // 消费者消费数据出队列
    T Pop()
    {
        LockGuard lockguard(_mutex);
        while(IsEmpty()) 
        {
            _csleep_num++;
            std::cout << "队列为空, 消费者进入休眠, 消费者休眠个数: " << _csleep_num << std::endl;
            _empty_cond.Wait(_mutex);
            _csleep_num--;
        }

        // 此时队列必定有空间
        T data = _q.front();
        _q.pop();

        // 只有队列为空时,消费者才会阻塞休眠,此时队列肯定不为空
        // 那么就判断是否有消费者休眠,有就唤醒
        if(_psleep_num > 0)
        {
            _full_cond.Signal();
            std::cout << "唤醒生产者..." << std::endl;
        }

        return data;
    }

    ~BlockQueue() {}
private:
    std::queue<T> _q;
    size_t _cap; // 队列容量大小

    Mutex _mutex;
    Cond _full_cond;
    Cond _empty_cond;

    int _csleep_num; // 消费者休眠的个数
    int _psleep_num; // 生产者休眠的个数
};

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-12-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 条件变量
    • 1.1 条件变量的基本概念
      • 条件变量的主要操作:
    • 1.2 条件变量函数
      • 初始化条件变量
      • 销毁条件变量
      • 等待条件满足
      • 唤醒等待的线程
      • 示例:
  • 2. 生产者消费者模型
    • 2.1 超市购物比喻:理解生产者消费者模型
      • 工作流程详解
      • 🎯 三种关键关系
      • 👥 两种角色
      • 🏪 一个交易场所:缓冲区
    • 2.2 为何使用生产者消费者模型?
    • 2.3 基于BlockingQueue的生产者消费者模型
      • BlockingQueue
      • 模拟阻塞队列的生产消费模型
  • 3. 为什么 pthread_cond_wait 需要互斥量?
  • 4. 封装条件变量
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档