前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于BlockingQueue的生产者消费者模型

基于BlockingQueue的生产者消费者模型

作者头像
南桥
发布2024-08-24 13:49:13
440
发布2024-08-24 13:49:13
举报
文章被收录于专栏:南桥谈编程

文章目录

  • 引言
  • 理解生产者消费者模型
  • 基于BlockingQueue的生产者消费者模型
    • 单生产,单消费模型
    • 多生产、多消费模型

引言

生产者消费者模型一般可以在超市中听到,例如如下是一个专门卖方便面的超市,这个超市有自己供应商,也有客户来买,客户称之为消费者。超市起到一个缓存作用,供应商放假的时候,短时间内超市依然有对应的商品,消费者依然可以消费;相同的,如果短时间内消费者不来买东西,供应商依然可以供应给超市。也就是说,供应商生产产品比较慢,可以先生成一批产品放在超市中;供应商如果供应比较快,可以等消费者消费一段时间再去供应产品,协调忙线不均。现实生活中,在人口密集的地方肯定会有超市,生产者消费者模型效率高,有了超市这个巨大的缓存,可以使得消费者和生产者并发起来。

个别消费者不想买方便面不会影响到供应商,个别供应商出现了问题,不会影响消费者买方便面,这就做到了生产者和消费者的解耦

理解生产者消费者模型

上述例子对应到计算机中,供应商和消费者就是线程,超市是一段内存空间,方便面是数据。生产线程将数据交到一段内存空间中,消费线程从内存空间中将数据拿走。

“321原则”:

  1. 一个交易场所(特定数据结构的形式存在的一段内存空间)
  2. 两种角色:生产者、消费者,也就是生产线程和消费线程
  3. 三种关系:生产和生产(互斥关系)、消费和消费(互斥关系)、生产和消费(互斥关系、同步关系)

实现生产者消费者模型本质就是通过代码实现“321原则”,用锁和条件变量(或者其他形式)来实现三种关系。

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

单生产,单消费模型

代码语言:javascript
复制
//BlockQueue.hpp
#pragma once

#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>

const static int defaultcap=5;

template<typename T>
class BlockQueue
{
private:
    bool isFull()
    {
        return _block_queue.size()==_max_cap;
    }
    
    bool isEmpty()
    {
        return _block_queue.empty();
    }

public:
    BlockQueue(int cap= defaultcap):_max_cap(cap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_P_cond,nullptr);
        pthread_cond_init(&_C_cond,nullptr);
    }
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);

        while(isEmpty())
        {
            pthread_cond_wait(&_C_cond,&_mutex);
        }

        *out=_block_queue.front();
        _block_queue.pop();

        pthread_mutex_unlock(&_mutex);

        pthread_cond_signal(&_P_cond);  //唤醒生产者
    }
    void Equeue(const T &in)
    {
        pthread_mutex_lock(&_mutex);

        while(isFull()) //阻塞队列满
        {
            //满了生产者不能再生产,必须等待
            pthread_cond_wait(&_P_cond,&_mutex); //被调用的时候,除了让自己继续排队等待,还会释放自己传递的锁
            //函数返回时,会返回在临界区,必须先参与锁的竞争,重新加上锁,该函数才会返回,依然是持有锁的状态

        }
        
        //阻塞队列未满或者被唤醒
        _block_queue.push(in);  //生产数据到阻塞队列
        
        pthread_mutex_unlock(&_mutex);

        pthread_cond_signal(&_C_cond);  //唤醒消费者
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_P_cond);
        pthread_cond_destroy(&_C_cond);
    }

private:
    std::queue<T> _block_queue;  //临界资源
    int _max_cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _P_cond;  //生产者条件变量
    pthread_cond_t _C_cond;  //消费者条件变量
};

Pop 函数:从队列中取出元素,并将其存储在 out 指针指向的地址中。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列为空,使用 pthread_cond_wait(&_C_cond, &_mutex) 等待消费者条件变量被信号唤醒。
  • 取出元素:从队列中取出前面的元素,并将其弹出。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒生产者:使用 pthread_cond_signal(&_P_cond) 唤醒可能被阻塞的生产者线程。

Equeue 函数:将元素 in 插入队列。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列已满,使用 pthread_cond_wait(&_P_cond, &_mutex) 等待生产者条件变量被信号唤醒。
  • 插入元素:将新元素插入到队列中。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒消费者:使用 pthread_cond_signal(&_C_cond) 唤醒可能被阻塞的消费者线程。

为了体现阻塞队列的特点,分别设计了两种测试代码:

  1. 生产一个,消费一个
代码语言:javascript
复制
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        //获取数据
        int data=0;
        bq->Pop(&data);
        //处理数据
        std::cout<<"Coumer -> "<<data<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        sleep(2);
        //构建数据
        int data=rand()%10+1;  // [1,10]
        //生产数据
        bq->Equeue(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    BlockQueue<int> *bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}
  1. 先生产一批数据,直到队列开始阻塞,然后消费一个,生产一个
代码语言:javascript
复制
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        int data=0;
        bq->Pop(&data);
        //处理数据
        std::cout<<"Coumer -> "<<data<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        //构建数据
        int data=rand()%10+1;  // [1,10]
        //生产数据
        bq->Equeue(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    BlockQueue<int> *bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

上述测试代码是传递一个int类型的数据到阻塞队列中,也可以传递其他类型,在传递struct或者class类型时,可以封装成一个个的任务传递到阻塞队列中。

  • 传递任务:
代码语言:javascript
复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>

class Task
{

public:
    Task()
    {}
    Task(int x,int y):_x(x),_y(y)
    {}

    void Excute()
    {
        _result=_x+_y;
    }
    std::string debug()
    {
        std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"=?";
        return msg;
    }
    std::string result()
    {
        std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"="+std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};
代码语言:javascript
复制
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        Task t;
        bq->Pop(&t);
        // bq->Pop(&data);
        //处理数据
        t.Excute();
        std::cout<<"Coumer -> "<<t.result()<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        //构建数据
        int x=rand()%10+1;
        usleep(x*1000);
        int y=rand()%10+1;
        Task t(x,y);
        //生产数据
        bq->Equeue(t);
        std::cout<<"Productor -> "<<t.debug()<<std::endl;
    }
}

int main()
{
    BlockQueue<Task> *bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}
  • 传递函数任务:
代码语言:javascript
复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>

using task_t=std::function<void()>;

void Download()
{
    std::cout<<"I am Download task"<<std::endl;
}
代码语言:javascript
复制
//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        task_t t;
        bq->Pop(&t);
        //处理数据
        t();

    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        bq->Equeue(Download);
        std::cout<<"Productor -> Download "<<std::endl;
    }
}

int main()
{
    BlockQueue<task_t> *bq=new BlockQueue<task_t>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

多生产、多消费模型

创建两个消费者线程 c1c2,它们会并行地从队列中取出任务并处理。创建三个生产者线程 p1p2p3,它们会并行地将任务放入队列中。

代码语言:javascript
复制
//main.cc

#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        //获取数据
        task_t t;
        bq->Pop(&t);
        t();

    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        bq->Equeue(Download);
        std::cout<<"Productor -> Download "<<std::endl;
        sleep(1);

    }
}

int main()
{
    BlockQueue<task_t> *bq=new BlockQueue<task_t>();
    pthread_t c1,c2,p1,p2,p3;
    pthread_create(&c1,nullptr,Consumer,bq);
    pthread_create(&c2,nullptr,Consumer,bq);
    pthread_create(&p1,nullptr,Productor,bq);
    pthread_create(&p1,nullptr,Productor,bq);
    pthread_create(&p3,nullptr,Productor,bq);

    pthread_join(c1,nullptr);
    pthread_join(c2,nullptr);
    pthread_join(p1,nullptr);
    pthread_join(p2,nullptr);
    pthread_join(p3,nullptr);

    return 0;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
    • 引言
      • 理解生产者消费者模型
        • 基于BlockingQueue的生产者消费者模型
          • 单生产,单消费模型
          • 多生产、多消费模型
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档