前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【linux学习指南】⽣产者消费者模型

【linux学习指南】⽣产者消费者模型

作者头像
学习起来吧
发布2025-02-17 18:44:55
发布2025-02-17 18:44:55
3400
代码可运行
举报
文章被收录于专栏:学习C/++
运行总次数:0
代码可运行

📝 ⽣产者消费者模型

  • 321原则(便于记忆)

🌉为何要使⽤⽣产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。

🌉⽣产者消费者模型优点

  • 解耦
  • ⽀持并发
  • ⽀持忙闲不均

🌠 基于BlockingQueue的⽣产者消费者模型

在多线程编程中阻塞队列(BlockingQueue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

🌉 C++ queue模拟阻塞队列的⽣产消费模型

代码:

  • 为了便于同学们理解,我们以单⽣产者,单消费者,来进⾏讲解。
  • 刚开始写,我们采⽤原始接⼝
  • 我们先写单⽣产,单消费。然后改成多⽣产,多消费(这⾥代码其实不变)。

🌠strncat 函数的使⽤

🌉BlockQueue.hpp
  • 第一个版本
代码语言:javascript
代码运行次数:0
复制
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"

namespace BlockQueueModule
{
    using namespace LockModule;
    using namespace CondModule;

    static const int gcap = 10;

    // version 1 
    template<typename T>
    class BlockQueue
    {
    private:
        bool IsFull()
        {
            return _q.size() == _cap;
        }

        bool IsEmpty()
        {
            return _q.empty();
        }

    public:
        BlockQueue(int cap = gcap)
            :_cap(cap)
            ,_cwait_num(0)
            ,_pwait_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_productor_cond, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
        }

        void EnQueue(const T& in) //生产者
        {
            pthread_mutex_lock(&_mutex);
            //你想放数据,就能放数据吗?生产数据是有条件的!
            //结论一:在临界区等待是必然的(目前)
            while(IsFull())//5,对条件进行判断, 为了防止伪唤醒, 我们通常使用while进行判断
            {
                std::cout<< "生产者进入等待..." <<std::endl;

                // 2,等是, 释放_mutex
                _pwait_num++;;
                pthread_cond_wait(&_productor_cond, &_mutex); //wait 的时候, 必定是持有锁的!!是有问题的
                _pwait_num--;
                //3,返回,线程被重新被唤醒&&重新持有锁(她会在临界区醒来!)

                std::cout<< "生产者被唤醒..." <<std::endl;
            }
            // 4,if(IsFull())不满足 || 线程被唤醒
            _q.push(in); //生产

            //肯定有数据
            if(_cwait_num)
            {
                std::cout<< "叫醒消费者" <<std::endl;
                pthread_cond_signal(&_consumer_cond);
            }
            pthread_mutex_unlock(&_mutex);

        }

        void Pop(T *out) //消费者
        {
            pthread_mutex_lock(&_mutex);
            while(IsEmpty())
            {
                std::cout<< "消费者进入等待..." <<std::endl;
                _cwait_num++;
                pthread_cond_wait(&_consumer_cond, &_mutex);
                _cwait_num--;
                std::cout<< "消费者被唤醒..." <<std::endl;
            }
            //4, if(IsEmpty())不满足 || 线程被唤醒
            *out = _q.front();
            _q.pop();

            //肯定有空间
            if(_pwait_num)
            {
                std::cout<< "叫醒生产者" <<std::endl;
                pthread_cond_signal(&_productor_cond);
            }

            pthread_mutex_unlock(&_mutex);
        }

        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_productor_cond);
            pthread_cond_destroy(&_consumer_cond);
        }

    private:
        std::queue<T> _q;               //保存数据的容器,临界资源
        int _cap;                       //bq最大容量
        pthread_mutex_t _mutex;         //互斥
        pthread_cond_t _productor_cond; //生产者条件变量
        pthread_cond_t _consumer_cond;  //消费者条件变量

        int _cwait_num;
        int _pwait_num;
    };

}
🌉Mutex.hpp
代码语言:javascript
代码运行次数:0
复制
#pragma once
#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex& operator= (const Mutex&) = delete;

        Mutex()
        {
            int n = ::pthread_mutex_init(&_lock, nullptr);
            (void)n;
        }

        ~Mutex()
        {
            int n = ::pthread_mutex_destroy(&_lock);
            (void)n;
        }

        void Lock()
        {
            int n = ::pthread_mutex_lock(&_lock);
            (void)n;
        }

        pthread_mutex_t* LockPtr()
        {
            return &_lock;
        }

        void unLock()
        {
            int n = ::pthread_mutex_unlock(&_lock);
            (void)n;
        }

    private:
        pthread_mutex_t _lock;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx):_mtx(mtx)
        {
            _mtx.Lock();
        }

        ~LockGuard()
        {
            _mtx.unLock();
        }

    private:
        Mutex &_mtx;
    };
}
🌉Cond.hpp
代码语言:javascript
代码运行次数:0
复制
#pragma once

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace LockModule;

    class Cond
    {
    public:
        Cond()
        {
            int n = ::pthread_cond_init(&_cond, nullptr);
            (void)n;
        }

        void Wait(Mutex& mutex)
        {
            int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
        }

        void Notify()
        {
            int n = ::pthread_cond_signal(&_cond);
            (void)n;
        }

        void MotifyAll()
        {
            int n = ::pthread_cond_broadcast(&_cond);
            (void)n;
        }


        ~Cond()
        {
            int n = ::pthread_cond_destroy(&_cond);
            (void)n;
        }
    private:
        pthread_cond_t _cond;
    };
}
🌉Makefile
代码语言:javascript
代码运行次数:0
复制
bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)

$(bin):$(obj)
	$(cc) -o $@ $^ -lpthread
%.o:%.cc
	$(cc) -c $< -std=c++17

.PHONY:clean
clean:
	rm -f $(bin) $(obj)

.PHONY:test
	echo $(src)
	echo $(obj)	
🌉Main.cc
代码语言:javascript
代码运行次数:0
复制
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

using namespace BlockQueueModule;

void* Consumer(void* args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>* >(args);
    while(true)
    {
        int data;
        // 1.从bq拿到数据
        bq->Pop(&data);

        //2, 做处理
        printf("Consumer, 消费了一个数据:%d\n", data);
    }
}

void* Productor(void* args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>* >(args);
    int data = 10;
    while(true)
    {
        sleep(2);
        //1,从外部获取数据
        //data = 10;//有数据??

        // 2, 生产到bq中
        bq->EnQueue(data);

        printf("productor 生产了一个数据:%d\n", data);
        data++;

    }
}

int main()
{
    //交易场所, 不仅仅可以用来进行传递数据
    // 传递任务!!v1: 对象 version2
    BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
    //单生产, 单消费
    pthread_t c1, p1; //c2, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    pthread_join(p1, nullptr);

    delete bq;
    
    return 0;
}

多生产多消费:

代码语言:javascript
代码运行次数:0
复制
int main()
{
    //交易场所, 不仅仅可以用来进行传递数据
    // 传递任务!!v1: 对象 version2
    BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
    //单生产, 单消费
    pthread_t c1, p1 ,c2, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    pthread_create(&p2, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    

    delete bq;
    
    return 0;
}
🌉BlockQueue.hpp

第二个版本:主要进行封装

代码语言:javascript
代码运行次数:0
复制
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"

namespace BlockQueueModule
{
    using namespace LockModule;
    using namespace CondModule;

    static const int gcap = 10;

    template<typename T>
    class BlockQueue
    {
    private:
        bool IsFull() { return _q.size() == _cap; }
        bool IsEmpty() { return _q.empty(); }

    public:
        BlockQueue(int cap = gcap) 
            :_cap(cap)
            ,_cwait_num(0)
            ,_pwait_num(0)
        {   
        }

        void EnQueue(const T& in) //生产者
        {
            LockGuard lockguard(_mutex);
            //你想放数据就能放数据吗?? 生产数据是有条件的!
            // 结论1:在临界区中等待是必然的
            while(IsFull())
            {
                std::cout<< "生产者进入等待..." <<std::endl;

                //2, 等是, 释放_mutex
                ++_pwait_num;
                _productor_cond.Wait(_mutex);
                --_pwait_num;
                //返回线程被唤醒,&& 重新申请并持有锁(他会在临界区醒来)

                std::cout<< "生产者被唤醒..." <<std::endl;
            }

            _q.push(in);

            //肯定有数据
            if(_cwait_num)
            {
                std::cout << "叫醒消费者..." <<std::endl;
                _consumer_cond.Notify();
            }

        }

        void Pop(T* out)//消费者
        {
            LockGuard lockgurad(_mutex);
            while(IsEmpty())
            {
                std::cout<< "消费者进入等待..." << std::endl;

                ++_cwait_num;
                _consumer_cond.Wait(_mutex);
                --_cwait_num;

                std::cout<< "消费者被唤醒...." <<std::endl;
            }

            *out = _q.front();
            _q.pop();

            if(_pwait_num)
            {
                std::cout << "唤醒生产者..." <<std::endl;
                _productor_cond.Notify();
            }

        }

        ~BlockQueue()
        {
            _mutex.~Mutex();
            _consumer_cond.~Cond();
            _productor_cond.~Cond();
        }

        
    private:
        std::queue<T> _q;       //保存数据的容器, 临界资源
        int _cap;               //bq的最大容器
        Mutex _mutex;          //互斥

        Cond _productor_cond;  //生产者条件变量
        Cond _consumer_cond;   //消费者条件变量

        int _cwait_num;
        int _pwait_num;
    };

}

🌠Task任务

注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以防⽌内置类型,⽐如int,对象也可以作为任务来参与⽣产消费的过程哦.``

代码语言:javascript
代码运行次数:0
复制
#pragma once

#include <iostream>
#include <unistd.h>

namespace TaskModule
{
    class Task
    {
    public:
        Task() {}
        Task(int a, int b)
            :x(a)
            ,y(b)
        {}

        void Excute() const
        {
            sleep(1); //用1s来进行模拟任务处理的时长
            mutable_result = x + y;// 入库, 访问缓存, 访问网络, 打印日志等
        }

        int getX() const { return x;}
        int getY() const { return y; }
        int getResult()  const { return mutable_result; }

        ~Task()
        {}

    private:
        int x;
        int y;
        mutable int mutable_result;
    };
}

Main.cc:

代码语言:javascript
代码运行次数:0
复制
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <functional>

using task_t = std::function<void()>;

using namespace BlockQueueModule;
using namespace TaskModule;

void test()
{
    std::cout << "haha test..." << std::endl;
}

void hello()
{
    std::cout << "hehe hello" << std::endl;
}
// 消费者线程函数
void* Consumer(void* args) {
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
    while (true) {
        Task t;
        // 1, 从 bq 拿到数据
        bq->Pop(&t);
        // 2,做处理
        t.Excute();
        printf("Consumer, 消费了一个数据:%d + %d = %d\n", t.getX(), t.getY(), t.getResult());
        sleep(2); // 消费者消费速度变慢
    }
    return nullptr;
}
// 生产者线程函数
void* Productor(void* args) {
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
    while (true) {

        //1,从外部拿数据
        int x = rand() % 10 + 1; // [1, 10]
        int y = rand() % 20 + 1; // [1, 20]

        Task t(x, y); // 示例数据
        t.Excute();
        printf("productor 生产了一个数据: %d + %d = %d\n", t.getX(), t.getY(), t.getResult());
        bq->EnQueue(t);
        sleep(500000);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr) ^ getpid());
    //交易场所, 不仅仅可以用来进行传递数据
    // 传递任务!!v1: 对象 version2
    // BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
    BlockQueue<task_t> *bq = new BlockQueue<task_t>(5); //共享资源 -> 临界资源
    
    //单生产, 单消费
    pthread_t c1, p1;//,c2, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    // pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    // pthread_create(&p2, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    // pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    // pthread_join(p2, nullptr);
    

    delete bq;
    
    return 0;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 📝 ⽣产者消费者模型
    • 🌉为何要使⽤⽣产者消费者模型
    • 🌉⽣产者消费者模型优点
  • 🌠 基于BlockingQueue的⽣产者消费者模型
    • 🌉 C++ queue模拟阻塞队列的⽣产消费模型
  • 🌠strncat 函数的使⽤
    • 🌉BlockQueue.hpp
    • 🌉Mutex.hpp
    • 🌉Cond.hpp
    • 🌉Makefile
    • 🌉Main.cc
    • 🌉BlockQueue.hpp
  • 🌠Task任务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档