假设有两个线程(进程)A、B和一个容器
线程A生产完数据之后不用等待线程B消费处理,直接将生产的数据放到这个容器当中;消费者线程B也不用找生产者线程A索要数据,而是直接监听容器有无数据,有数据就取出消费。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力。
该模型的关键在于
消费者不会在缓冲区无数据时消耗数据。
若是容器有上限也要保证生产者不会在缓冲区满时加入数据。
以下是通过stl的queue队列做容器实现一个简单的生产者-消费者模型
#pragma once
#include <iostream>
#include <functional>
class Task
{
private:
int id;
public:
Task(int id) :id(id)
{
std::cout << "生产:" << id << std::endl;
}
int GetId() { return id; }
void SetId(int k) {
id = k;
}
void Run()
{
std::cout << "消费:" << id << std::endl;
}
};
该代码中生产者线程产生一系列 Task 对象并将其放入共享队列中,消费者线程从队列中取出这些任务并处理它们。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include "Task.h"
#define QueueMaxBuffer 5 // 队列最大容量
#define ProduceMax 8 //生产者最大生产任务量
std::queue<Task*> g_DataBuffer;
std::mutex g_DataBufferMutex;
std::condition_variable g_CondVar;
std::atomic<bool> g_ProducerDone(false); // 生产者完成标志
void ProduceData()
{
int value = 0;
while (true)
{
// 这个线程等待一百毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(g_DataBufferMutex);
// 等待一秒直到队列未满
if (g_DataBuffer.size() < QueueMaxBuffer);
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Task* t = new Task(value++);
g_DataBuffer.push(t);
// 通知消费者数据准备好了
g_CondVar.notify_one();
// 可以在这里改动退出逻辑
if (value > ProduceMax)
{
std::cout << "Producer done, produced " << value - 1 << " tasks." << std::endl;
g_ProducerDone.store(true); // 设置生产者完成标志
break;
}
}
}
void ConsumeData()
{
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::unique_lock<std::mutex> lock(g_DataBufferMutex);
g_CondVar.wait(lock, [] { return !g_DataBuffer.empty() || g_ProducerDone.load(); });
if (g_DataBuffer.empty() && g_ProducerDone.load())
break; // 如果队列为空且生产者已完成,则退出
Task* data = g_DataBuffer.front();
data->Run();
g_DataBuffer.pop();
delete data;
std::cout << "Consumed one task, remaining tasks in queue: " << g_DataBuffer.size() << std::endl;
}
}
int main()
{
std::thread p(ProduceData);
std::thread c(ConsumeData);
// 等待生产者线程和消费者线程完成
p.join();
c.join();
return 0;
}
写这篇文章时,碰了不少壁,线程之间的时序问题相当不熟练以至于出现bug时,会导致我难以定位,最后只能妥协用一些较为简单的代码代替感觉有问题的地方。
虽然简单的实现了一个多线程的生产者消费者模型,但缺点不少,等以后碰到具体的应用场景时,再来完善其内容吧。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。