⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。
在多线程编程中阻塞队列(BlockingQueue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
代码:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
namespace BlockQueueModule
{
using namespace LockModule;
using namespace CondModule;
static const int gcap = 10;
// version 1
template<typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() == _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int cap = gcap)
:_cap(cap)
,_cwait_num(0)
,_pwait_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_productor_cond, nullptr);
pthread_cond_init(&_consumer_cond, nullptr);
}
void EnQueue(const T& in) //生产者
{
pthread_mutex_lock(&_mutex);
//你想放数据,就能放数据吗?生产数据是有条件的!
//结论一:在临界区等待是必然的(目前)
while(IsFull())//5,对条件进行判断, 为了防止伪唤醒, 我们通常使用while进行判断
{
std::cout<< "生产者进入等待..." <<std::endl;
// 2,等是, 释放_mutex
_pwait_num++;;
pthread_cond_wait(&_productor_cond, &_mutex); //wait 的时候, 必定是持有锁的!!是有问题的
_pwait_num--;
//3,返回,线程被重新被唤醒&&重新持有锁(她会在临界区醒来!)
std::cout<< "生产者被唤醒..." <<std::endl;
}
// 4,if(IsFull())不满足 || 线程被唤醒
_q.push(in); //生产
//肯定有数据
if(_cwait_num)
{
std::cout<< "叫醒消费者" <<std::endl;
pthread_cond_signal(&_consumer_cond);
}
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) //消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
std::cout<< "消费者进入等待..." <<std::endl;
_cwait_num++;
pthread_cond_wait(&_consumer_cond, &_mutex);
_cwait_num--;
std::cout<< "消费者被唤醒..." <<std::endl;
}
//4, if(IsEmpty())不满足 || 线程被唤醒
*out = _q.front();
_q.pop();
//肯定有空间
if(_pwait_num)
{
std::cout<< "叫醒生产者" <<std::endl;
pthread_cond_signal(&_productor_cond);
}
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
private:
std::queue<T> _q; //保存数据的容器,临界资源
int _cap; //bq最大容量
pthread_mutex_t _mutex; //互斥
pthread_cond_t _productor_cond; //生产者条件变量
pthread_cond_t _consumer_cond; //消费者条件变量
int _cwait_num;
int _pwait_num;
};
}
#pragma once
#include <iostream>
#include <pthread.h>
namespace LockModule
{
class Mutex
{
public:
Mutex(const Mutex&) = delete;
const Mutex& operator= (const Mutex&) = delete;
Mutex()
{
int n = ::pthread_mutex_init(&_lock, nullptr);
(void)n;
}
~Mutex()
{
int n = ::pthread_mutex_destroy(&_lock);
(void)n;
}
void Lock()
{
int n = ::pthread_mutex_lock(&_lock);
(void)n;
}
pthread_mutex_t* LockPtr()
{
return &_lock;
}
void unLock()
{
int n = ::pthread_mutex_unlock(&_lock);
(void)n;
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex &mtx):_mtx(mtx)
{
_mtx.Lock();
}
~LockGuard()
{
_mtx.unLock();
}
private:
Mutex &_mtx;
};
}
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
namespace CondModule
{
using namespace LockModule;
class Cond
{
public:
Cond()
{
int n = ::pthread_cond_init(&_cond, nullptr);
(void)n;
}
void Wait(Mutex& mutex)
{
int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
}
void Notify()
{
int n = ::pthread_cond_signal(&_cond);
(void)n;
}
void MotifyAll()
{
int n = ::pthread_cond_broadcast(&_cond);
(void)n;
}
~Cond()
{
int n = ::pthread_cond_destroy(&_cond);
(void)n;
}
private:
pthread_cond_t _cond;
};
}
bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)
$(bin):$(obj)
$(cc) -o $@ $^ -lpthread
%.o:%.cc
$(cc) -c $< -std=c++17
.PHONY:clean
clean:
rm -f $(bin) $(obj)
.PHONY:test
echo $(src)
echo $(obj)
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
using namespace BlockQueueModule;
void* Consumer(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>* >(args);
while(true)
{
int data;
// 1.从bq拿到数据
bq->Pop(&data);
//2, 做处理
printf("Consumer, 消费了一个数据:%d\n", data);
}
}
void* Productor(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>* >(args);
int data = 10;
while(true)
{
sleep(2);
//1,从外部获取数据
//data = 10;//有数据??
// 2, 生产到bq中
bq->EnQueue(data);
printf("productor 生产了一个数据:%d\n", data);
data++;
}
}
int main()
{
//交易场所, 不仅仅可以用来进行传递数据
// 传递任务!!v1: 对象 version2
BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
//单生产, 单消费
pthread_t c1, p1; //c2, p2, p3;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(p1, nullptr);
delete bq;
return 0;
}
多生产多消费:
int main()
{
//交易场所, 不仅仅可以用来进行传递数据
// 传递任务!!v1: 对象 version2
BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
//单生产, 单消费
pthread_t c1, p1 ,c2, p2, p3;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_create(&p2, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
delete bq;
return 0;
}
第二个版本:主要进行封装
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
namespace BlockQueueModule
{
using namespace LockModule;
using namespace CondModule;
static const int gcap = 10;
template<typename T>
class BlockQueue
{
private:
bool IsFull() { return _q.size() == _cap; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = gcap)
:_cap(cap)
,_cwait_num(0)
,_pwait_num(0)
{
}
void EnQueue(const T& in) //生产者
{
LockGuard lockguard(_mutex);
//你想放数据就能放数据吗?? 生产数据是有条件的!
// 结论1:在临界区中等待是必然的
while(IsFull())
{
std::cout<< "生产者进入等待..." <<std::endl;
//2, 等是, 释放_mutex
++_pwait_num;
_productor_cond.Wait(_mutex);
--_pwait_num;
//返回线程被唤醒,&& 重新申请并持有锁(他会在临界区醒来)
std::cout<< "生产者被唤醒..." <<std::endl;
}
_q.push(in);
//肯定有数据
if(_cwait_num)
{
std::cout << "叫醒消费者..." <<std::endl;
_consumer_cond.Notify();
}
}
void Pop(T* out)//消费者
{
LockGuard lockgurad(_mutex);
while(IsEmpty())
{
std::cout<< "消费者进入等待..." << std::endl;
++_cwait_num;
_consumer_cond.Wait(_mutex);
--_cwait_num;
std::cout<< "消费者被唤醒...." <<std::endl;
}
*out = _q.front();
_q.pop();
if(_pwait_num)
{
std::cout << "唤醒生产者..." <<std::endl;
_productor_cond.Notify();
}
}
~BlockQueue()
{
_mutex.~Mutex();
_consumer_cond.~Cond();
_productor_cond.~Cond();
}
private:
std::queue<T> _q; //保存数据的容器, 临界资源
int _cap; //bq的最大容器
Mutex _mutex; //互斥
Cond _productor_cond; //生产者条件变量
Cond _consumer_cond; //消费者条件变量
int _cwait_num;
int _pwait_num;
};
}
注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以防⽌内置类型,⽐如int,对象也可以作为任务来参与⽣产消费的过程哦.``
#pragma once
#include <iostream>
#include <unistd.h>
namespace TaskModule
{
class Task
{
public:
Task() {}
Task(int a, int b)
:x(a)
,y(b)
{}
void Excute() const
{
sleep(1); //用1s来进行模拟任务处理的时长
mutable_result = x + y;// 入库, 访问缓存, 访问网络, 打印日志等
}
int getX() const { return x;}
int getY() const { return y; }
int getResult() const { return mutable_result; }
~Task()
{}
private:
int x;
int y;
mutable int mutable_result;
};
}
Main.cc:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <functional>
using task_t = std::function<void()>;
using namespace BlockQueueModule;
using namespace TaskModule;
void test()
{
std::cout << "haha test..." << std::endl;
}
void hello()
{
std::cout << "hehe hello" << std::endl;
}
// 消费者线程函数
void* Consumer(void* args) {
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while (true) {
Task t;
// 1, 从 bq 拿到数据
bq->Pop(&t);
// 2,做处理
t.Excute();
printf("Consumer, 消费了一个数据:%d + %d = %d\n", t.getX(), t.getY(), t.getResult());
sleep(2); // 消费者消费速度变慢
}
return nullptr;
}
// 生产者线程函数
void* Productor(void* args) {
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while (true) {
//1,从外部拿数据
int x = rand() % 10 + 1; // [1, 10]
int y = rand() % 20 + 1; // [1, 20]
Task t(x, y); // 示例数据
t.Excute();
printf("productor 生产了一个数据: %d + %d = %d\n", t.getX(), t.getY(), t.getResult());
bq->EnQueue(t);
sleep(500000);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
//交易场所, 不仅仅可以用来进行传递数据
// 传递任务!!v1: 对象 version2
// BlockQueue<int> *bq = new BlockQueue<int>(5); //共享资源 -> 临界资源
BlockQueue<task_t> *bq = new BlockQueue<task_t>(5); //共享资源 -> 临界资源
//单生产, 单消费
pthread_t c1, p1;//,c2, p2, p3;
pthread_create(&c1, nullptr, Consumer, bq);
// pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
// pthread_create(&p2, nullptr, Productor, bq);
pthread_join(c1, nullptr);
// pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
// pthread_join(p2, nullptr);
delete bq;
return 0;
}