前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Linux多线程【生产者消费者模型】

Linux多线程【生产者消费者模型】

作者头像
北 海
发布2023-10-27 17:18:29
3410
发布2023-10-27 17:18:29
举报

✨个人主页: 北 海 🎉所属专栏: Linux学习之旅 🎃操作环境: CentOS 7.6 腾讯云远程服务器


🌇前言

生产者消费者模型(CP模型)是一种非常经典的设计,常常出现在各种 「操作系统」 书籍中,深受教师们的喜爱;这种模型在实际开发中还被广泛使用,因为它在多线程场景中是十分高效的!

🏙️正文

1、生产者消费者模型

1.1、什么是生产者消费者模型?

「生产者消费者模型」是通过一个容器来解决生产者与消费者的强耦合关系,生产者与消费者之间不直接进行通讯,而是利用 「容器」来进行通讯

生产者?消费者?容器?耦合?晦涩难懂的名词难免让人打起退堂鼓,其实它们都很好理解,比如接下来我们可以借助一个 超市 的例子来深刻理解 生产者消费者模型


可以先忘掉之前的定义,接下来让我们看看 心连心超市 的工作模式

超市的工作模式

  • 超市从工厂进货,工厂需要向超市提供商品
  • 顾客在超市选购,超市需要向顾客提供商品

超市盈利的关键在于 平衡顾客与工厂间的供需关系

简单来说就是要做到 顾客可以在超市买到想要购买的商品,工厂也能同超市完成足量的需求订单,满足条件后,超市就可以盈利了

超市盈利的同时可以给供给双方带来便利

  • 顾客不需要跑到工厂购买商品
  • 工厂也不需要将商品配送到顾客手中

这就叫做 解决生产者与消费者间的强耦合关系

得益于 超市 做缓冲区,整个 生产消费 的过程十分高效,即便顾客没有在超市中找到想要的商品,也可借助超市之手向工厂进行反映,从而生产对应的商品,即 允许生产消费步调不一致

现实中的 超市工作模式 就是一个生动形象的 「生产者消费者模型」

  • 顾客 -> 「消费者」
  • 工厂 -> 「生产者」
  • 超市 -> 「交易场所(容器)」

生产者消费者模型的本质:忙闲不均

其中的 「交易场所」 是进行 生产消费 的容器,通常是一种特定的 缓冲区,常见的有 阻塞队列 和 环形队列

超市不可能只面向一个顾客及一个工厂,「交易场所」 也是如此,会被多个 生产者消费者(多个线程) 看到,也就是说 「交易场所」 注定是一个共享资源;在多线程环境中,需要保证 共享资源被多线程并发访问时的安全(详见 Linux多线程【线程互斥与同步】)


回归现实中,多个工厂供应同一种商品时,为了抢占更多的市场,总会通过一些促销手段来排除竞品,比如经典的 泡面巨头 <康师傅与统一> 的大战,市场(超市中的货架位置)是有限的,在工厂竞争之下,势必有一家工厂失去市场,因此可以得出 生产者与生产者之前需要维持 「互斥」 关系

生产者与生产者:「互斥」


张三和李四在超市偶遇,俩人同时看中了 「快乐牌刀片」,但最近超市货源紧张,这个商品仅有一份,张三李四互不谦让,都在奋力争夺这个商品,显然当商品只有一份时 消费者与消费者之间也需要维护 「互斥」关系

消费者与消费者:「互斥」


某天张三又来到了超市,打算购买他最喜欢的 老坛酸菜牛肉面,但好巧不巧,超市的最后一桶 老坛酸菜牛肉面 已经售出,张三只能通知超市进行备货,超市老板记下了这个需求,张三失落的回了家,刚到家,张三的肚子就饿的咕咕叫,十分想念 老坛酸菜牛肉面,于是火速赶往超市,看看超市是否有货,答案是没有,法外狂徒张三是一个执着的人,总是反复跑到超市查看是否有货,导致张三这一天什么事也干不成,只想着自己的 老坛酸菜牛肉面;其实张三不必这样做,只需要在第一次告诉超市老板自己的需求,并添加老板的联系方式,让老板在商品备货完成后通知张三前来购买,将商品信息同步给消费者,这样可以避免张三陷入循环,同理对于工厂来说,超市老板也应该添加工厂负责人的联系方式,将商品信息同步给生产者,也就是说 生产者与消费者之间存在 「同步」关系;除此之外,在超市备货期间,张三是不能来购买的,即 生产者与消费者之间还存在 「互斥」关系

生产者与消费者:同步、互斥

注意: 生产者与消费者之间的「互斥」关系不是必备的,目的是为了让 生产、消费 之间存在顺序

「生产者消费者模型」是一个存在 生产者、消费者、交易场所 三个条件,以及不同角色间的 同步、互斥 关系的高效模型

1.2、生产者消费者模型的特点

「生产者消费者模型」 的最根本特点是 321原则

3 种关系生产者与生产者:互斥 消费者与消费者:互斥 生产者与消费者:互斥与同步2 种角色 生产者 消费者1 个交易场所 通常是一个特定的缓冲区(阻塞队列、环形队列)

注:**321** 原则并非众所周知的概念,仅供辅助记忆 「生产者消费者模型」的特点

任何 「生产者消费者模型」 都离不开这些必备特点

生产者与消费者间的同步关系

  • 生产者不断生产,交易场所堆满商品后,需要通知消费者进行消费
  • 消费者不断消费,交易场所为空时,需要通知生产者进行生产

通知线程需要用到条件变量,即维护 同步 关系

其实之前在 Linux 进程间通信 【管道通信】 中学习到的 管道 本质上就是一个天然的 「生产者消费者模型」,因为它允许多个进程同时访问,并且不会出现问题,意味着它维护好了 「互斥、同步」 关系;当写端写满管道时,无法再写,通知读端进行读取;当管道为空时,无法读取,通知写端写入数据

1.3、生产者消费者模型的优点

「生产者消费者模型」为何高效?

  • 生产者、消费者 可以在同一个交易场所中进行操作
  • 生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置
  • 消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据
  • 可以根据不同的策略,调整生产者于与消费者间的协同关系

「生产者消费者模型」可以根据供需关系灵活调整策略,做到 忙闲不均

除此之外,「生产者消费者模型」 划分出了三个不同的条件:生产者、消费者、交易场所 各司其职,可以根据具体需求自由设计,很好地做到了 解耦,便于维护和扩展


2、基于阻塞队列实现生产者消费者模型

2.1、阻塞队列

编写 「生产者消费者模型」 需要用到 Linux 互斥与同步 的知识,这里先选择 阻塞队列 作为交易场所进行实现,在正式编写代码前,需要先认识一下 阻塞队列

阻塞队列 Blocking Queue 是一种特殊的队列,作为队列家族的一员,它具备 先进先出 FIFO 的基本特性,与普通队列不同的是: 阻塞队列 的大小是固定的,也就说它存在 容量 的概念

阻塞队列可以为空,也可以为满

将其带入 「生产者消费者模型」 中,入队 就是 生产商品,而 出队 则是 消费商品

  • 阻塞队列为满时:无法入队 -> 无法生产(阻塞)
  • 阻塞队列为空时:无法出队 -> 无法消费(阻塞)

是不是感觉跟 管道 十分相像,至于如何处理队空/队满的特殊情况,就需要借助 「互斥、同步」 相关知识了,具体在代码中体现

2.2、单生产单消费模型

首先来实现最简单的 单生产单消费者模型,首先搭好 阻塞队列类 的框架

创建 BlockingQueue.hpp 头文件

代码语言:javascript
复制
#pragma once

#include <queue>
#include <mutex>
#include <pthread.h>

// 命名空间,避免冲突
namespace Yohifo
{
#define DEF_SIZE 10

    template<class T>
    class BlockQueue
    {
    public:
        BlockQueue(size_t cap = DEF_SIZE)
            :_cap(cap)
        {
            // 初始化锁与条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }

        ~BlockQueue()
        {
            // 销毁锁与条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_cond);
        }

        // 生产数据(入队)
        void Push(const T& inData);

        // 消费数据(出队)
        void Pop(T* outData);
    private:
 
        // 判断是否为满
        bool IsFull();
        
        // 判断是否为空
        bool IsEmpty();

    private:
        std::queue<T> _queue;
        size_t _cap; // 阻塞队列的容量
        pthread_mutex_t _mtx; // 互斥锁(存疑)
        pthread_cond_t _cond; // 条件变量(存疑)
    };
}

如何判断阻塞队列是否为空:判断 queue 是否为空

如何判断阻塞队列是否为满:判断 queue 的大小是否为 _cap

使用 互斥锁 + 条件变量 实现互斥与同步

获得工具框架后,接下来搭建 生产与消费 的代码

因为是 单生产、单消费,只需要手动创建两个线程即可

创建 cp.cc 源文件

代码语言:javascript
复制
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "BlockingQueue.hpp"

void* Producer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 1.生产商品(通过某种渠道获取数据)
        // ...

        // 2.将商品推送至阻塞队列中
        // bq->Push(data);
    }

    pthread_exit((void*)0);
}

void* Consumer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 1.从阻塞队列中获取商品
        // bq->Pop(&data);

        // 2.消费商品(结合某种具体业务进行处理)
        // ...
    }

    pthread_exit((void*)0);
}

int main()
{
    // 创建一个阻塞队列
    Yohifo::BlockQueue<int>* bq = new Yohifo::BlockQueue<int>;

    // 创建两个线程(生产者、消费者)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, bq);
    pthread_create(&con, nullptr, Consumer, bq);

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    delete bq;
    return 0;
}

注意:

  1. 生产者、消费者需要看到同一个阻塞队列,可以通过线程的回调函数参数进行传递
  2. 其他具体实现细节仍需填充

以上就是 「生产者消费者模型」 所需要的大体框架,具体细节实现可以接着往下看,不过在这之前需要先理解 为什么生产、为什么消费


数据就像能量一样,不会凭空产生,也不会凭空消失,因此生产者线程在生产 商品(数据) 时,一定是从某种渠道获取的,比如客户发出的 HTTP请求、程序猿发出的 SQL 语句、涉及复杂运算的计算任务等,总之 生产者需要先获取数据,才能将其放入阻塞队列中,等待处理

同理,消费者线程在获取 商品(数据) 后,也需要结合业务逻辑做出不同的动作,比如根据 HTTP 请求进行响应、返回 SQL 查询结果、返回计算结果等,一句话总结:生产者生产商品、消费者消费商品都是需要时间的,并非单纯地对阻塞队列进行操作

这是一个十分重要的概念,它能帮助我们正确看待 生产者、消费者 的作用,这是被大多数教材忽略的重要概念


现在可以补充 BlockingQueue.hppcp.cc 具体细节了,首先来看看 BlockingQueue.hpp 的实现

BlockQueue 的成员变量问题(互斥锁、条件变量如何分配)

「生产者消费者模型」 中,有 满、空 两个条件,这两个条件是 绝对互斥 的,不可能同时满足,「生产者」关心是否为满「消费者」关心是否为空,两者关注的点不一样,也就是说不能只使用一个条件变量来控制两个条件,而是需要 一个生产者条件变量、一个消费者条件变量

代码语言:javascript
复制
BlockQueue(size_t cap = DEF_SIZE)
    :_cap(cap)
{
    // 初始化锁与条件变量
    pthread_mutex_init(&_mtx, nullptr);
    pthread_cond_init(&_pro_cond, nullptr);
    pthread_cond_init(&_con_cond, nullptr);
}

~BlockQueue()
{
    // 销毁锁与条件变量
    pthread_mutex_destroy(&_mtx);
    pthread_cond_destroy(&_pro_cond);
    pthread_cond_destroy(&_con_cond);
}

std::queue<T> _queue;
size_t _cap; // 阻塞队列的容量
pthread_mutex_t _mtx; // 互斥锁
pthread_cond_t _pro_cond; // 生产者条件变量
pthread_cond_t _con_cond; // 消费者条件变量

创建两个条件变量是阻塞队列的精髓之一

条件变量需要两个,锁是否也需要两把呢?

答案是不需要,因为无论是「生产者」还是「消费者」,它们需要看到同一个阻塞队列,因此使用一把互斥锁进行保护就行了

BlockQueuePushPop 函数简单实现

首先来看看 Push 函数的简单实现,Push 函数的功能就是将传入的数据 inData 添加到 阻塞队列

中,因为 阻塞队列 是一个 临界资源,在访问前必须加锁

代码语言:javascript
复制
// 生产数据(入队)
void Push(const T& inData)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    _queue.push(inData);

    pthread_mutex_unlock(&_mtx);
}

这样写只是对 普通临界资源 的访问,但这里是 阻塞队列,插入数据的前提是 有空间,不然是要被 阻塞 的,所以我们在进行数据插入前,应该先判断条件是否满足,不满足就得 阻塞等待条件满足

代码语言:javascript
复制
// 生产数据(入队)
void Push(const T& inData)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    // 判断条件是否满足
    if(IsFull())
    {
        pthread_cond_wait(&_pro_cond, &mtx);
    }
    
    _queue.push(inData);

    pthread_mutex_unlock(&_mtx);
}

// 判断是否为满
bool IsFull()
{
    return _queue.size() == _cap;
}

当条件不满足时,生产者线程进入等待状态

这里可以解释一下为什么需要给 pthread_cond_wait 函数传入互斥锁

  • 首先要明白,判断条件是否满足,是在临界区内进行的,也就是说当前线程持有锁
  • 当条件不满足时,当前线程进入条件等待状态,也就意味着它现在无法向后运行,将锁释放
  • 此时其他线程就得不到锁资源了,程序进入了死锁状态
  • 解决方法就是 将锁资源传递给 pthread_cond_wait 函数,使其拥有 「释放锁、获取锁」的能力,这样就能保证不会出现 死锁

这就是 同步 能解决 死锁 问题的关键,因为它可以主动让出 锁资源

过了一段时间,当条件满足时(消费者已经消费数据了),代码从 pthread_cond_wait 函数之后继续运行,生产者可以正常进行生产(可以确保一定有空位),一切看起来似乎很和谐,但此时存在一个致命问题:如果是「消费者」先阻塞(阻塞队列为空),「生产者」正常进行生产,当生产满后,「生产者」也进入了阻塞状态,此时就尴尬了,彼此都陷入了阻塞等待状态

造成此问题的根本原因是:「生产者」在生产结束后没有唤醒「消费者」,让其进行正常消费,所以在 生产完成 后,需要唤醒 「消费者」 进行消费

代码语言:javascript
复制
// 生产数据(入队)
void Push(const T& inData)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    // 判断条件是否满足
    if(IsFull())
    {
        pthread_cond_wait(&_pro_cond, &mtx);
    }

    _queue.push(inData);

    // 可以加策略唤醒,比如生产一半才唤醒消费者
    pthread_cond_signal(&_con_cond);

    pthread_mutex_unlock(&_mtx);
}

单生产、单消费场景中的 Push 可以这样写,其他场景就需要稍微进行修改

注意: 生产者唤醒的是消费者,也就是需要传递 _con_cond

当消费者没有 wait 等待,生产者仍然进行唤醒时,是否会出现问题? 答案是不会,唤醒一个没有 wait 的线程是不会有影响的,同时因为唤醒线程这个操作不需要加锁保护(本身就持有锁资源句柄),**pthread_cond_signal** 函数可以放到 pthread_mutex_unlock 语句之后

有了 Push 的实现经验后,Pop 的实现就很简单了,依葫芦画瓢,简单实现如下:

代码语言:javascript
复制
// 消费数据(出队)
void Pop(T* outData)
{
    // 加锁
    ptherad_mutex_lock(&_mtx);

    if(IsEmpty())
    {
        pthread_cond_wait(&_con_cond, &_mtx);
    }

    *outData = _queue.front();
    _queue.pop();

    // 可以加策略唤醒,比如消费完后才唤醒生产者
    pthread_cond_signal(&_pro_cond);

    pthread_mutex_unlock(&_mtx);
}

// 判断是否为空
bool IsEmpty()
{
    return _queue.empty();
}

这种写法也只适用于单生产、单消费场景中

注意: 消费者唤醒的是生产者,需要传递 _pro_cond


cp.cc 的使用填充

有了 「生产者消费者模型」 后,就可以进行简单使用了

因为这里没有具体的业务场景,所以我们就使用 生成一个随机数 作为待插入的数据,打印数字 作为获取数据后的操作

代码语言:javascript
复制
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "BlockingQueue.hpp"

void* Producer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 1.生产商品(通过某种渠道获取数据)
        int num = rand() % 10;

        // 2.将商品推送至阻塞队列中
        bq->Push(num);

        std::cout << "Producer 生产了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

void* Consumer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 1.从阻塞队列中获取商品
        int num;
        bq->Pop(&num);

        // 2.消费商品(结合某种具体业务进行处理)
        std::cout << "Consumer 消费了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::BlockQueue<int>* bq = new Yohifo::BlockQueue<int>;

    // 创建两个线程(生产者、消费者)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, bq);
    pthread_create(&con, nullptr, Consumer, bq);

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    delete bq;
    return 0;
}

此时可以编译并运行程序,可以看到 生产者疯狂生产,消费者疯狂消费

这样不容易观察到 阻塞队列 的特点,我们可以通过 睡眠 的方式模拟效果

策略1:消费者每隔一秒消费一次,生产者疯狂生产

应该观察到的现象是 生产者很快就把阻塞队列填满了,只能阻塞等待,**1** 秒之后,消费者进行消费,消费结束后唤醒生产者,两者进入协同状态:生产者生产一个数据、消费者消费一个数据

代码语言:javascript
复制
void* Consumer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 消费者每隔一秒进行一次消费
        sleep(1);

        // 1.从阻塞队列中获取商品
        int num;
        bq->Pop(&num);

        // 2.消费商品(结合某种具体业务进行处理)
        std::cout << "Consumer 消费了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

程序运行结果符合预期

策略2:生产者每隔一秒生产一次,消费者不断消费

预期结果为 刚开始阻塞队列为空,消费者无法进行消费,只能阻塞等待,一秒后,生产者生产了一个数据,并立即通知消费者进行消费,两者协同工作,消费者消费的就是生产者刚刚生产的数据

代码语言:javascript
复制
void* Producer(void *args)
{
    Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);

    while(true)
    {
        // 生产者每隔一秒生产一次
        sleep(1);
        
        // 1.生产商品(通过某种渠道获取数据)
        int num = rand() % 10;

        // 2.将商品推送至阻塞队列中
        bq->Push(num);

        std::cout << "Producer 生产了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

运行结果如下

两种策略都符合预期,证明当前的 「生产者消费者模型」 是可用的(单生产单消费场景中)

2.3、多生产多消费模型

在上面的 「生产者消费者模型」 中,存在一些细节问题

细节1:只有当条件满足时,才能进行 生产/消费

之前单纯使用一个 if 进行判断过于草率

理由如下:

  1. pthread_cond_wait 函数可能调用失败(误唤醒、伪唤醒),此时如果是 if 就会向后继续运行,导致在条件不满足的时候进行了 生产/消费
  2. 在多线程场景中,可能会使用 pthread_cond_broadcast 唤醒所有等待线程,如果在只生产了一个数据的情况下,唤醒所有线程,会导致只有一个线程进行了合法操作,其他线程都是非法操作了

关于当前代码使用 if 判读,在多线程环境中广播 pthread_cond_broadcast的理解 这就好比食堂里有很多人等待出餐,当阿姨仅做好一份饭后,就通知所有同学过来取餐,直接导致其他同学白跑一趟;带入程序中,直接影响就是 生产者/消费者 在 队列满/队列空 的情况下,仍然进行了 数据生产/数据消费

所以需要把条件判断改成 while,直到条件满足后,才向后运行

代码语言:javascript
复制
// 生产数据(入队)
void Push(const T& inData)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    // 循环判断条件是否满足
    while(IsFull())
    {
        pthread_cond_wait(&_pro_cond, &_mtx);
    }

    _queue.push(inData);

    // 可以加策略唤醒,比如生产一半才唤醒消费者
    pthread_cond_signal(&_con_cond);

    pthread_mutex_unlock(&_mtx);
}

// 消费数据(出队)
void Pop(T* outData)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    // 循环判读条件是否满足
    while(IsEmpty())
    {
        pthread_cond_wait(&_con_cond, &_mtx);
    }

    *outData = _queue.front();
    _queue.pop();

    // 可以加策略唤醒,比如消费完后才唤醒生产者
    pthread_cond_signal(&_pro_cond);

    pthread_mutex_unlock(&_mtx);
}

细节2:生产者消费者模型的高效体现在 「解耦」

生产、消费 的过程是加锁的、串行化执行,可能有的人无法 get「生产者消费者模型」 的高效,这是因为没有对 「生产者消费者模型」 进行一个全面的理解

单纯的向队列中放数据、从队列中取数据本身效率就很高,但 生产者从某种渠道获取数据、消费者获取数据后进行某种业务处理,这是效率比较低的操作,「生产者消费者模型」 做到了这两点

1.消费者在进行业务处理时,生产者可以直接向队列中 push 数据

比如 消费者 在获取到数据后,需要进行某种高强度的运算,当然这个操作与 生产者 是没有任何关系的,得益于 阻塞队列 作为缓冲区,生产者 可以在 消费者 进行运算时 push 数据

这就好比你买了一桶泡面回家吃,厂商并不需要关心你吃完没有,直接正常向超市供货就行了

2.生产者在进行数据生产时,消费者可以直接向队列中 pop 数据

同上,消费者 不需要关心 生产者 的状态,只要 阻塞队列 中还有数据,正常 pop 获取就行了;也就是说你在超市购物时,无需关心工厂的生产情况,因为这与你无关

一句话总结:生产者不必关心消费者的消费情况,消费者也不需要关心生产者的生产情况

而这就是 「生产者消费者模型」 高效的体现,也是对模型的全面理解

这有点像 「冯·诺依曼体系结构」 中的 内存,扮演着中间商的角色,使得 CPU 能和 外设 协同高效工作


细节3:阻塞队列中不止能放 int**,还能放对象**

对象包罗万物,可玩性非常高,比如这里增加一个简单 Task 任务类,实现基本的两数运算

创建 Task.hpp 头文件

代码语言:javascript
复制
#pragma once

#include <string>

namespace Yohifo
{
    // 支持泛型
    template<class T>
    class Task
    {
    public:
        Task(T x = 0, T y = 0, char op = '+')
            :_x(x), _y(y), _op(op), _res(0), _err(0)
        {}

        // 重载运算操作
        void operator()()
        {
            // 简单计算
            switch(_op)
            {
                case '+':
                    _res = _x + _y;
                break;
                case '-':
                    _res = _x - _y;
                break;
                case '*':
                    _res = _x * _y;
                break;
                case '/':
                    if(_y == 0)
                        _err = -1;
                    else
                        _res = _x / _y;    
                break;
                case '%':
                    if(_y == 0)
                        _err = -2;
                    else
                        _res = _x % _y;    
                break;
                default:
                    _err = 1;
                break;
            }
        }

        // 获取计算结果
        std::string GetResult()
        {
            // 根据错误标识,返回计算结果
            std::string ret = std::to_string(_x) + " " + _op + " " + std::to_string(_y);
            
            if(_err)
            {
                ret += " error";

                // 判读是 / 错误还是 % 错误
                if(_res == -1)
                    ret += " [-1] \t / 0 引发了错误";
                else
                    ret += " [-2] \t % 0 引发了错误";
            }
            else
            {
                ret += " = " + std::to_string(_res);
            }

            return ret;
        }

    private:
        T _x;
        T _y;
        char _op; // 运算符
        T _res; // 结果
        int _err; // 错误标识
    };
}

得到这样一个任务类后,就可以更改 cp.cc 中生产者、消费者线程的处理逻辑了

这里就简单修改,随机获取两个数和一个运算符,并计算出结果

代码语言:javascript
复制
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "BlockingQueue.hpp"
#include "Task.hpp"

void* Producer(void *args)
{
    Yohifo::BlockQueue<Yohifo::Task<int>>* bq = static_cast<Yohifo::BlockQueue<Yohifo::Task<int>>*>(args);

    // 运算符集
    std::string opers = "+-*/%";

    while(true)
    {
    	// 生产者每隔一秒生产一次
        sleep(1);
     
        // 1.生产商品(通过某种渠道获取数据)
        // 随机获取两个数(可以改为输入)
        int x = rand() % 100;
        int y = rand() % 100;

        // 随机获取一种运算符
        char ops[] = {'+', '-', '*', '/', '%'};
        char op = opers[rand() % opers.size()];

        // 2.将商品推送至阻塞队列中
        // 创建匿名对象,并 Push 入阻塞队列中
        bq->Push(Yohifo::Task<int>(x, y, op));

        std::cout << "Producer 生产了: " << x << " " << y << " " << op << " 构成的对象" << std::endl;
        std::cout << "----------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

void* Consumer(void *args)
{
    Yohifo::BlockQueue<Yohifo::Task<int>>* bq = static_cast<Yohifo::BlockQueue<Yohifo::Task<int>>*>(args);

    while(true)
    {        
        // 1.从阻塞队列中获取商品
        Yohifo::Task<int> task;
        bq->Pop(&task);

        // 进行业务处理
        task();

        std::string ret = task.GetResult();

        // 2.消费商品(结合某种具体业务进行处理)
        std::cout << "Consumer 消费了一个对象,并获得结果: " << ret << std::endl;
        std::cout << "===========================" << std::endl;
    }

    pthread_exit((void*)0);
}

int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::BlockQueue<Yohifo::Task<int>>* bq = new Yohifo::BlockQueue<Yohifo::Task<int>>;

    // 创建两个线程(生产者、消费者)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, bq);
    pthread_create(&con, nullptr, Consumer, bq);

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    delete bq;
    return 0;
}

为了避免打印到显示器时的格式错乱问题(屏幕也是临界资源,理论上也需要加锁保护),这里让 生产者 每隔一秒生产一次,进而控制 消费速度

这里故意把 _y 指定为 0,查看运算出错的情况

格局打开,这里只是放了一个简单计算的任务,我们实际还可以放入更复杂的任务,比如 网络请求、**SQL** 查询、并行 IO

尤其是 IO,使用 「生产者消费者模型」 可以大大提高效率,包括后面的 多路转接,也可以接入 「生产者消费者模型」 来提高效率


OK,现在可以尝试修改代码以适应 多生产多消费场景

需要改吗?不需要,至少在当前的代码设计中,我们的代码完全可以应付 多线程多消费

接下来在原有代码的基础上,直接多创建几个线程

代码语言:javascript
复制
int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::BlockQueue<Yohifo::Task<int>>* bq = new Yohifo::BlockQueue<Yohifo::Task<int>>;

    // 创建多个线程(生产者、消费者)
    pthread_t pro[2], con[3];

    for(int i = 0; i < 2; i++)
        pthread_create(pro + i, nullptr, Producer, bq);

    for(int i = 0; i < 3; i++)
        pthread_create(con + i, nullptr, Consumer, bq);
    
    for(int i = 0; i < 2; i++)
        pthread_join(pro[i], nullptr);
    for(int i = 0; i < 3; i++)
        pthread_join(con[i], nullptr);

    delete bq;
    return 0;
}

运行结果如下,可以看到,确实有多个线程在运行,运行结果也没有问题

为什么当前代码设计中不需要修改就能适用于 多生产多消费场景 呢?

原因有两点:

  1. 生产者、消费者都是在对同一个 _queue 操作,用一把锁,保护一个临界资源,足够了
  2. 当前的 _queue 始终是被当作一个整体使用的,无需再增加锁区分

其实分别给 生产者和消费者 各配一把锁也是可以的,但在当前代码设计中(使用同一个 _queue),完全没有必要

以上就是关于 基于阻塞队列实现「生产者消费者模型」的全部内容了,除了使用互斥锁外,还可以使用信号量,也就是使用环形队列来实现 「生产者消费者模型」


3、POSIX 信号量

3.1、信号量的基本知识

互斥、同步 不只能通过 互斥锁、条件变量 实现,还能通过 信号量 sem**、互斥锁** 实现(出自 POSIX 标准)

「信号量」 的本质就是一个 计数器

  • 申请到资源,计数器 --**(**P 操作)
  • 释放完资源,计数器 ++**(**V 操作)

「信号量」PV 操作都是原子的,假设将 「信号量」 的值设为 1,用来表示 「生产者消费者模型」阻塞队列 _queue 的使用情况

  • sem 值为 1 时,线程可以进行 「生产 / 消费」,**sem--**
  • sem 值为 0 时,线程无法进行 「生产 / 消费」,只能阻塞等待

此时的 「信号量」 只有两种状态:10,可以实现类似 互斥锁 的效果,即实现 线程互斥,像这种只有两种状态的信号量称为 「二元信号量」

「信号量」 不止可以用于 互斥,它的主要目的是 描述临界资源中的资源数目,比如我们可以把 阻塞队列 切割成 N 份,初始化 「信号量」 的值为 N,当某一份资源就绪时,sem--,资源被释放后,sem++,如此一来可以像 条件变量 一样实现 同步

  • sem == N 时,阻塞队列已经空了,消费者无法消费
  • sem == 0 时,阻塞队列已经满了,生产者无法生产

用来实现 互斥、同步 的信号量称为 「多元信号量」

综上所述,在使用 「多元信号量」 访问资源时,需要先申请 「信号量」,只有申请成功了才能进行资源访问,否则会进入阻塞等待,即当前资源不可用

在实现 互斥、同步 时,该如何选择? 结合业务场景进行分析,如果待操作的共享资源是一个整体,比较适合使用 互斥锁+条件变量 的方案,但如果共享资源是多份资源,使用 信号量 就比较方便

其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订(详见 Linux进程间通信【消息队列、信号量】

对于 「信号量」 的第一层理解:申请信号量实际是一种资源预订机制

只要申请 「信号量」 成功了,就一定可以访问临界资源

如果将 「信号量」 实际带入我们之前写的 「生产者消费者模型」 代码中,是不需要进行资源条件判断的,因为 「信号量」本身就已经是资源的计数器了

对于 「信号量」 的第二层理解:使用信号量时,就已经把资源条件判断转化成了信号量的申请行为

比如可以直接这样写

代码语言:javascript
复制
// 生产数据(入队)
void Push(const T& inData)
{
    // 申请信号量 P操作
    // ...

    _queue.push(inData);

    // ...
    // 释放信号量 V操作
}
3.2、信号量相关操作

有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是释放简单的,依旧是只有四个接口:初始化、销毁、申请、释放

初始化信号量

代码语言:javascript
复制
#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数1:需要初始化的信号量,**sem_t** 实际就是一个联合体,里面包含了一个 char 数组,以及一个 long int 成员

代码语言:javascript
复制
typedef union
{
  char __size[__SIZEOF_SEM_T];
  long int __align;
} sem_t;

参数2:表示当前信号量的共享状态,传递 0 表示线程间共享,传递 非0 表示进程间共享

参数3:信号量的初始值,可以设置为双元或多元信号量

返回值:初始化成功返回 0**,失败返回** -1**,并设置错误码**

销毁信号量

代码语言:javascript
复制
#include <semaphore.h>

int sem_destroy(sem_t *sem);

参数:待销毁的信号量

返回值:成功 0**,失败** -1**, 并设置错误码**

申请信号量(等待信号量)

代码语言:javascript
复制
#include <semaphore.h>

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

主要使用 sem_wait

参数:表示从哪个信号量中申请

返回值:成功返回 0**,失败返回** -1**,并设置错误码**

其他两种申请方式分别是:尝试申请,如果没有申请到资源,就会放弃申请;每隔一段时间进行申请,即 timeout

释放信号量(发布信号量)

代码语言:javascript
复制
#include <semaphore.h>

int sem_post(sem_t *sem);

参数:将资源释放到哪个信号量中

返回值:成功返回 0**,失败返回** -1**,并设置错误码**

这批接口属于是看一眼就会用,再多看一眼就会爆炸,参数、返回值含义基本都相同,非常容易上手,接下来直接用信号量实现 「生产者消费者模型」


4、基于环形队列实现生产者消费者模型

4.1、环形队列

「生产者消费者模型」 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列,所谓的 环形队列 并非 队列,而是用数组模拟实现的 “队列”, 并且它的 判空、判满 比较特殊

如何让 环形队列 “转” 起来?

  • 可以通过取模的方式(可以重复获取一段区间值),确定下标

环形队列 如何判断当前为满、为空?

策略一:多开一个空间,**head**、**tail** 位于同一块空间中时,表示当前队列为空;在进行插入、获取数据时,都是对下一块空间中的数据进行操作,因为多开了一块空间,当待生产的数据落在 head 指向的空间时,就表示已经满了

策略二:参考阻塞队列,搞一个计数器,当计数器的值为 0 时,表示当前为空,当计数器的值为容量时,表示队列为满

这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器

环形队列 中,生产者消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据

除非两者相遇,其他情况下生产者、消费者可以并发运行(同时访问环形队列)

两者错位时正常进行生产消费就好了,但两者相遇时需要特殊处理,也就是处理 空、满 两种情况,这就是 环形队列 的运转模式


这里可以引入一个小游戏,来辅助理解 环形队列 的运转模式

假设存在一个大圆桌,上面摆放了一圈空盘子,可以往上面放苹果,也可以取上面的苹果

张三和李四打算展开一场 苹果追逐赛,张三作为 追逐方,目标是移动并获取盘子中的苹果,李四作为 被追逐方,目标是往盘子中放苹果,并向下一个空盘子移动

注意:这里的移动指顺时针移动,不能跳格,这是游戏核心规则

游戏基本规则:

  1. 当两者相遇,且圆桌中没有苹果时,被追逐方(李四)先跑,对方(张三)阻塞
  2. 当两者相遇,且圆桌中全是苹果时,追逐方(张三)先跑,对方(李四)阻塞
  3. 被追逐方(李四)不能套圈追逐方(张三)
  4. 同时追逐方(张三)也不能超过被追逐方(李四)

ok,现在游戏开始,张三和李四处于同一块空间中(起点),此时两人处于一种特殊情况中,不能同时进行 苹果拾取/苹果放置,由于是刚开始,作为 被追逐方 的李四理应先走,否则两者就都阻塞了(张三追上李四时的情况与刚开始的情况一致)

所以可以得出结论:环形队列为空时,生产者需要先生产数据,消费者阻塞

李四先跑,边跑边放苹果,此时因为张三还没有追上李四,所以张三也是边跑边拾取苹果,两者展开了激烈的追逐赛(高效率)

在追逐过程中,张三李四都能同时对圆桌中的格子进行操作,这是非常高效的,环形队列不为空、不为满时,生产者、消费者可以同时进行并发操作

游戏进行到白热化阶段,法外狂徒张三一不注意摔了一跤,导致拾取苹果的速度不断减慢,李四见状火力全开,不断放置苹果,很快张三就被李四追上了,此时场上已经摆满了苹果,规定一个盘子只能放置一个苹果,李四无法在放置苹果,只能阻塞等待张三进行苹果拾取

场上摆满苹果的情况对应着 环形队列为满的情况,生产者不能再生产,消费者需要进行消费

ok,游戏到这里就可以结束了,因为已经足够总结出 环形队列 的运作模式了

被追逐方(李四) -> 生产者

追逐方(张三) -> 消费者

大圆桌 -> 环形队列

空盘 -> 无数据,可生产

苹果 -> 有数据,可消费

运作模式

  • 环形队列为空时:消费者阻塞,只能由生产者进行生产,生产完商品后,消费者可以消费商品
  • 环形队列为满时:生产者阻塞,只能由消费者进行消费,消费完商品后,生产者可以生产商品
  • 其他情况:生产者、消费者并发运行,各干各的事,互不影响

张三和李四也就只能在 满、空 时相遇了


忘记张三和李四的小游戏,将 环形队列 的运行模式带入 「生产者消费者模型」

可以使用 「信号量」 标识资源的使用情况,但生产者和消费者关注的资源并不相同,所以需要使用两个 「信号量」 来进行操作

  • 生产者信号量:标识当前有多少可用空间
  • 消费者信号量:标识当前有多少数据

如果说搞两个 条件变量阻塞队列 的精髓,那么搞两个 信号量 就是 环形队列 的精髓,显然,刚开始的时候,生产者信号量初始值为环形队列的大小,消费者信号量初始值为 0

无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费

比如上图中的 pro_sem 就表示 生产者还可以进行 3 次生产con_sem 表示 消费者还可以消费 5

生产者、消费者对于 「信号量」 的申请可以这样理解

代码语言:javascript
复制
// 生产者
void Producer()
{
	// 申请信号量(空位 - 1)
	sem_wait(&pro_sem);

	// 生产商品
	// ...

	// 释放信号量(商品 + 1)
	sem_post(&con_sem);
}

// 消费者
void Consumer()
{
	// 申请信号量(商品 - 1)
	sem_wait(&con_sem);

	// 消费商品
	// ...

	// 释放信号量(空位 + 1)
	sem_post(&pro_sem);
}

生产者和消费者指向同一个位置时保证线程安全,其他情况保证并发度

至于怎么落实到代码中,需要接着往下看

4.2、单生产单消费模型

首先来实现简单点的单生产、单消费版 「生产者消费者模型」

起手先创建一个 环形队列 头文件

创建 RingQueue.hpp 头文件

代码语言:javascript
复制
#pragma once

#include <vector>
#include <semaphore.h>

namespace Yohifo
{
#define DEF_CAP 10

    template<class T>
    class RingQueue
    {
    public:
        RingQueue(size_t cap = DEF_CAP)
            :_cap(cap), _pro_step(0), _con_step(0)
        {
            _queue.resize(_cap);

            // 初始化信号量
            sem_init(&_pro_sem, 0, _cap);
            sem_init(&_con_sem, 0, 0);
        }

        ~RingQueue()
        {
            // 销毁信号量
            sem_destroy(&_pro_sem);
            sem_destroy(&_con_sem);
        }

        // 生产商品
        void Push(const T &inData)
        {
            // 申请信号量
            P(&_pro_sem);

            // 生产
            _queue[_pro_step++] = inData;
            _pro_step %= _cap;

            // 释放信号量
            V(&_con_sem);
        }

        // 消费商品
        void Pop(T *outData)
        {
            // 申请信号量
            P(&_con_sem);

            // 消费
            *outData = _queue[_con_step++];
            _con_step %= _cap;

            // 释放信号量
            V(&_pro_sem);
        }

    private:
        void P(sem_t *sem)
        {
            sem_wait(sem);
        }

        void V(sem_t *sem)
        {
            sem_post(sem);
        }

    private:
        std::vector<T> _queue;
        size_t _cap;
        sem_t _pro_sem;
        sem_t _con_sem;
        size_t _pro_step; // 生产者下标
        size_t _con_step; // 消费者下标
    };
}

细节:

  • 生产者的信号量初始值为 DEF_CAP
  • 消费者的信号量初始值为 0
  • 生产者、消费者的起始下标都为 0

在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?

通过两个 信号量,当两个 信号量 都不为 0 时,双方可以并发操作,这是 环形队列 最大的特点;当 生产者信号量为 0 时,生产者陷入阻塞等待,等待消费者消费;同理当 消费者信号量为 0 时,消费者也会阻塞住,在这里阻塞就是 互斥 的体现。当对方完成 生产 / 消费 后,自己会解除阻塞状态,而这就是 同步

目前代码没问题(单生产单消费场景中)

创建 cp.cc 源文件(可以复用之前的测试代码)

这个没啥好说的,直接 copy 之前的代码,稍微修改下类名即可

代码语言:javascript
复制
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "RingQueue.hpp"

void* Producer(void *args)
{
    Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);

    while(true)
    {
        // 生产者慢一点
        sleep(1);

        // 1.生产商品(通过某种渠道获取数据)
        int num = rand() % 10;

        // 2.将商品推送至阻塞队列中
        rq->Push(num);

        std::cout << "Producer 生产了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

void* Consumer(void *args)
{
    Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);

    while(true)
    {
        // 消费者慢一点
        // sleep(1);

        // 1.从阻塞队列中获取商品
        int num;
        rq->Pop(&num);

        // 2.消费商品(结合某种具体业务进行处理)
        std::cout << "Consumer 消费了一个数据: " << num << std::endl;
        std::cout << "------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::RingQueue<int>* rq = new Yohifo::RingQueue<int>;

    // 创建两个线程(生产者、消费者)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, rq);
    pthread_create(&con, nullptr, Consumer, rq);

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    delete rq;
    return 0;
}

编译并运行程序。为了使结果更加清晰,分别展示 生产者每隔一秒生产一次、消费者每隔一秒消费一次的结果

生产者每隔一秒生产一次

消费者每隔一秒消费一次

这里的运行结果与 阻塞队列 那边的一模一样,证明当前的 「生产者消费者模型」 没有问题(单生产单消费场景中)

注:如果想要提高并发度,可以增大环形队列的容量

4.3、多生产多消费模型

环形队列 中可不止能放整数,还能放 Task 任务,我们可以把之前的 Task.hpp 引入,重新测试 环形队列

引入 Task.hpp 创建 cp.cc,在之前的基础上略微修改

代码语言:javascript
复制
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "RingQueue.hpp"
#include "Task.hpp"

void* Producer(void *args)
{
    Yohifo::RingQueue<Yohifo::Task<int>>* rq = static_cast<Yohifo::RingQueue<Yohifo::Task<int>>*>(args);

    // 运算符集
    std::string opers = "+-*/%";

    while(true)
    {
        // 1.生产商品(通过某种渠道获取数据)
        // 随机获取两个数(可以改为输入)
        int x = rand() % 100;
        int y = rand() % 100;

        // 随机获取一种运算符
        char ops[] = {'+', '-', '*', '/', '%'};
        char op = opers[rand() % opers.size()];

        // 生产商品需要时间
        usleep(10000);

        // 2.将商品推送至阻塞队列中
        // 创建匿名对象,并 Push 入阻塞队列中
        rq->Push(Yohifo::Task<int>(x, y, op));

        std::cout << "Producer 生产了: " << x << " " << y << " " << op << " 构成的对象" << std::endl;
        std::cout << "----------------------------" << std::endl;
    }

    pthread_exit((void*)0);
}

void* Consumer(void *args)
{
    Yohifo::RingQueue<Yohifo::Task<int>>* rq = static_cast<Yohifo::RingQueue<Yohifo::Task<int>>*>(args);

    while(true)
    {        
        // 1.从阻塞队列中获取商品
        Yohifo::Task<int> task;
        rq->Pop(&task);

        // 进行业务处理
        task();

        // 消费商品也需要时间
        usleep(10000);

        std::string ret = task.GetResult();

        // 2.消费商品(结合某种具体业务进行处理)
        std::cout << "Consumer 消费了一个对象,并获得结果: " << ret << std::endl;
        std::cout << "===========================" << std::endl;
    }

    pthread_exit((void*)0);
}

int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::RingQueue<Yohifo::Task<int>>* rq = new Yohifo::RingQueue<Yohifo::Task<int>>;

    // 创建两个线程(生产者、消费者)
    pthread_t pro, con;
    pthread_create(&pro, nullptr, Producer, rq);
    pthread_create(&con, nullptr, Consumer, rq);

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    delete rq;
    return 0;
}

环形队列版的 CP 模型也能适用于任务执行


接下来可以实现 多生产多消费场景 中的 CP 模型了,多生产多消费无非就是增加了 消费者与消费者生产者与生产者 间的 互斥 关系,加锁就行了,现在问题是加几把锁?

答案是 两把,因为当前的 生产者和消费者 关注的资源不一样,一个关注剩余空间,另一个关注是否有商品,一把锁是无法锁住两份不同资源的,所以需要给 生产者、消费者 各配一把锁

阻塞队列 中为什么只需要一把锁? 因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了

代码语言:javascript
复制
#pragma once

#include <vector>
#include <mutex>
#include <semaphore.h>

namespace Yohifo
{
#define DEF_CAP 10

    template<class T>
    class RingQueue
    {
    public:
        RingQueue(size_t cap = DEF_CAP)
            :_cap(cap), _pro_step(0), _con_step(0)
        {
            _queue.resize(_cap);

            // 初始化信号量
            sem_init(&_pro_sem, 0, _cap);
            sem_init(&_con_sem, 0, 0);

            // 初始化互斥锁
            pthread_mutex_init(&_pro_mtx, nullptr);
            pthread_mutex_init(&_con_mtx, nullptr);
        }

        ~RingQueue()
        {
            // 销毁信号量
            sem_destroy(&_pro_sem);
            sem_destroy(&_con_sem);

            // 销毁互斥锁
            pthread_mutex_destroy(&_pro_mtx);
            pthread_mutex_destroy(&_con_mtx);
        }

        // 生产商品
        void Push(const T &inData)
        {
            // 申请信号量
            P(&_pro_sem);

            Lock(&_pro_mtx);

            // 生产
            _queue[_pro_step++] = inData;
            _pro_step %= _cap;

            UnLock(&_pro_mtx);

            // 释放信号量
            V(&_con_sem);
        }

        // 消费商品
        void Pop(T *outData)
        {
            // 申请信号量
            P(&_con_sem);

            Lock(&_con_mtx);

            // 消费
            *outData = _queue[_con_step++];
            _con_step %= _cap;

            UnLock(&_con_mtx);

            // 释放信号量
            V(&_pro_sem);
        }

    private:
        void P(sem_t *sem)
        {
            sem_wait(sem);
        }

        void V(sem_t *sem)
        {
            sem_post(sem);
        }

        void Lock(pthread_mutex_t *lock)
        {
            pthread_mutex_lock(lock);
        }

        void UnLock(pthread_mutex_t *lock)
        {
            pthread_mutex_unlock(lock);
        }

    private:
        std::vector<T> _queue;
        size_t _cap;
        sem_t _pro_sem;
        sem_t _con_sem;
        size_t _pro_step; // 生产者下标
        size_t _con_step; // 消费者下标

        pthread_mutex_t _pro_mtx;
        pthread_mutex_t _con_mtx;
    };
}

细节: 加锁行为放在信号量申请成功之后,可以提高并发度

环形队列 中,可以在申请 「信号量」 前进行加锁,也可以在申请 「信号量」 后进行加锁,这里比较推荐的是 在申请 「信号量」 后加锁

如何理解?

这就好比一群学生在进行座位编排,可以先放一个学生进入教室,再给他确定座位;也可以先给每个人确定好自己的座位(一人一座),然后排队进入教室,对号入座即可。先申请 「信号量」 相当于先确定座位,避免进入教室(加锁)后还得选座位

加锁意味着串行化,一定会降低效率,但因为 「信号量」 的操作是原子的,可以确保线程安全,也就不需要加锁保护;也就是可以并发申请 「信号量」,再串行化访问临界资源

接下来增加 生产者、消费者 的线程数量,并进行测试

修改 cp.cc

代码语言:javascript
复制
// ...

int main()
{
    // 种 种子
    srand((size_t)time(nullptr));

    // 创建一个阻塞队列
    Yohifo::RingQueue<Yohifo::Task<int>>* rq = new Yohifo::RingQueue<Yohifo::Task<int>>;

    // 创建多个线程(生产者、消费者)
    pthread_t pro[10], con[20];

    for(int i = 0; i < 10; i++)
    pthread_create(pro + i, nullptr, Producer, rq);

    for(int i = 0; i < 20; i++)
    pthread_create(con + i, nullptr, Consumer, rq);

    for(int i = 0; i < 10; i++)
    pthread_join(pro[i], nullptr);

    for(int i = 0; i < 20; i++)
        pthread_join(con[i], nullptr);

    delete rq;
    return 0;
}

此时一批线程就都运行起来了

阻塞队列 效率已经够高了,那么创造 环形队列 的意义在哪呢?

首先要明白 「生产者消费者模型」 高效的地方从来都不是往缓冲区中放数据、从缓冲区中拿数据

对缓冲区的操作对于计算机说就是小 case,需要关注的点在于 获取数据和消费数据,这是比较耗费时间的,阻塞队列 至多支持获取 一次数据获取一次数据消费,在代码中的具体体现就是 所有线程都在使用一把锁,并且每次只能 push**、**pop 一个数据;而 环形队列 就不一样了,生产者、消费者 可以通过 条件变量 知晓数据获取、数据消费次数,并且由于数据获取、消费操作没有加锁,支持并发,因此效率十分高

环形队列 中允许 N 个生产者线程一起进行数据获取,也允许 N 个消费者线程一起进行数据消费,简单任务处理感知不明显,但复杂任务就不一样了,这就有点像同时下载多份资源,是可以提高效率的

注意: 一起操作并非同时操作,任务开始时间有先后,但都是在进行处理的

环形队列 一定优于 阻塞队列 吗?

答案是否定的,存在即合理,如果 环形队列 能完全碾压 阻塞队列,那么早就不用学习 阻塞队列 了,这两种都属于 「生产者消费者模型」 常见的交易场所,有着各自的适用场景

特征

阻塞队列(互斥锁实现)

环形队列(信号量实现)

内部同步机制

使用互斥锁或类似的锁机制来实现线程安全

使用信号量来实现线程安全

阻塞操作

支持阻塞操作,当队列为空或已满时,线程可以等待

也支持阻塞操作,当队列为空或已满时,线程可以等待

数据覆盖

通常不会覆盖已有元素,新元素添加时需要等待队列有空间

有界的,当队列已满时,添加新元素会覆盖最早的元素

实现复杂度

实现可能较为复杂,需要处理锁的获取和释放

实现相对较简单,需要管理信号量

线程安全

通过锁来保证线程安全,容易引入死锁问题

通过信号量来保证线程安全,不易引入死锁问题

添加和删除操作时间复杂度

O(1)(在队列未满或非空时)

O(1)(常数时间,除非队列已满或为空)

应用场景

多线程数据传递,任务调度,广播通知等

循环缓存,数据轮询,循环任务调度等


🌆总结

以上就是本次关于 Linux多线程【生产者消费者模型】的全部内容了,在本文中我们首先学习了「生产者消费者模型」的基本概念,然后学习了阻塞队列与环形队列这两种交易场所,并分别用代码进行了实现,当然还学习了信号量这个强大工具。多线程编程中,最重要的是确保线程安全问题,而 「生产者消费者模型」 在确保线程安全的同时提高了并发操作的效率,值得学习和使用


相关文章推荐 Linux多线程 =====:> 【初始多线程】【线程控制】【线程互斥与同步】 Linux进程信号 ===== :> 【信号产生】【信号保存】【信号处理】 Linux进程间通信 ===== :> 【消息队列、信号量】【共享内存】【命名管道】【匿名管道】 Linux基础IO ===== :> 【软硬链接与动静态库】【深入理解文件系统】【模拟实现C语言文件流】【重定向及缓冲区理解】【文件理解与操作】 Linux进程控制 ===== :> 【简易版bash】【进程程序替换】【创建、终止、等待】 Linux进程学习 ===== :> 【进程地址】【环境变量】【进程状态】【基本认知】 Linux基础 ===== :> 【gdb】【git】【gcc/g++】【vim】Linux 权限理解和学习听说Linux基础指令很多?这里都帮你总结好了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-10-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 🌇前言
  • 🏙️正文
    • 1、生产者消费者模型
      • 1.1、什么是生产者消费者模型?
      • 1.2、生产者消费者模型的特点
      • 1.3、生产者消费者模型的优点
    • 2、基于阻塞队列实现生产者消费者模型
      • 2.1、阻塞队列
      • 2.2、单生产单消费模型
      • 2.3、多生产多消费模型
    • 3、POSIX 信号量
      • 3.1、信号量的基本知识
      • 3.2、信号量相关操作
    • 4、基于环形队列实现生产者消费者模型
      • 4.1、环形队列
      • 4.2、单生产单消费模型
      • 4.3、多生产多消费模型
  • 🌆总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档