前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >生产者-消费者模型C++多线程简单实现

生产者-消费者模型C++多线程简单实现

原创
作者头像
晨星成焰
发布2024-07-29 09:22:44
1222
发布2024-07-29 09:22:44
举报
文章被收录于专栏:C++入门基础知识

什么是生产者-消费者模型?

假设有两个线程(进程)A、B和一个容器

线程A生产完数据之后不用等待线程B消费处理,直接将生产的数据放到这个容器当中;消费者线程B也不用找生产者线程A索要数据,而是直接监听容器有无数据,有数据就取出消费。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力。

该模型的关键在于

消费者不会在缓冲区无数据时消耗数据。

若是容器有上限也要保证生产者不会在缓冲区满时加入数据。

以下是通过stl的queue队列做容器实现一个简单的生产者-消费者模型

代码实现

Task.h头文件

  • 首先我们需要一个类简单的模拟平常的任务需求
  • 通过new一个对象生成任务填充进待消费的队列
  • 运行Run以后视为消费了,delete掉之前new的对象
代码语言:cpp
复制
#pragma once
#include <iostream>
#include <functional>
class Task
{
private:
	int id;
public:
	Task(int id) :id(id)
	{
		std::cout << "生产:" << id << std::endl;
	}
	int GetId() { return id; }
	void SetId(int k) {
		id = k;
	}
	void Run()
	{
		std::cout << "消费:" << id << std::endl;
	}
};

main.cpp

该代码中生产者线程产生一系列 Task 对象并将其放入共享队列中,消费者线程从队列中取出这些任务并处理它们。

  • 预先定义的一些变量
    • #define QueueMaxBuffer 5 ,用于模拟容器容量达到上限时的情景,当达到上限时生产者等待一秒后再继续生产数据
    • #define ProduceMax 8 , 用于模拟生产者的最大生产任务量,当达到最大生产任务量退出循环
    • g_DataBuffer用于存储生产者产生的任务,并供消费者消费。
    • g_DataBufferMutex;用于保护对 g_DataBuffer 的访问
    • g_CondVar 一个条件变量,可以使消费者线程等待队列不为空,而生产者线程会在向队列添加新任务后通知等待的消费者。
    • g_ProducerDone 用于标记生产者是否完成了所有的任务生产。
  • 生产者线程函数:
    • 任务满了就等待一秒,再继续生产
    • 每生产一个任务通过notify_one() 方法来通知一个等待的消费者线程
    • 每100毫秒生产一个任务
  • 消费者线程函数:
    • 用 g_CondVar.wait() 使线程等待,直到队列非空或 生产者生产完成既g_ProducerDone 为 true。
    • 每400秒消费一个任务
    • 当消费完成,既队列为空且生产者生产完成时退出
代码语言:cpp
复制
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include "Task.h"

#define QueueMaxBuffer 5  // 队列最大容量
#define ProduceMax 8  //生产者最大生产任务量

std::queue<Task*> g_DataBuffer;
std::mutex g_DataBufferMutex;
std::condition_variable g_CondVar;
std::atomic<bool> g_ProducerDone(false);  // 生产者完成标志

void ProduceData()
{
	int value = 0;
	while (true)
	{
		// 这个线程等待一百毫秒
		std::this_thread::sleep_for(std::chrono::milliseconds(100));

		std::unique_lock<std::mutex> lock(g_DataBufferMutex);
		// 等待一秒直到队列未满
		if (g_DataBuffer.size() < QueueMaxBuffer);
		else
		{
			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
		}

		Task* t = new Task(value++);
		g_DataBuffer.push(t);

		// 通知消费者数据准备好了
		g_CondVar.notify_one();

		// 可以在这里改动退出逻辑
		if (value > ProduceMax)
		{
			std::cout << "Producer done, produced " << value - 1 << " tasks." << std::endl;
			g_ProducerDone.store(true);  // 设置生产者完成标志
			break;
		}
	}
}

void ConsumeData()
{
	while (true)
	{
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
		std::unique_lock<std::mutex> lock(g_DataBufferMutex);
		g_CondVar.wait(lock, [] { return !g_DataBuffer.empty() || g_ProducerDone.load(); });

		if (g_DataBuffer.empty() && g_ProducerDone.load())
			break;  // 如果队列为空且生产者已完成,则退出

		Task* data = g_DataBuffer.front();
		data->Run();
		g_DataBuffer.pop();
		delete data;

		std::cout << "Consumed one task, remaining tasks in queue: " << g_DataBuffer.size() << std::endl;
	}
}

int main()
{
	std::thread p(ProduceData);
	std::thread c(ConsumeData);

	// 等待生产者线程和消费者线程完成
	p.join();
	c.join();

	return 0;
}

运行截图

总结

写这篇文章时,碰了不少壁,线程之间的时序问题相当不熟练以至于出现bug时,会导致我难以定位,最后只能妥协用一些较为简单的代码代替感觉有问题的地方。

虽然简单的实现了一个多线程的生产者消费者模型,但缺点不少,等以后碰到具体的应用场景时,再来完善其内容吧。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是生产者-消费者模型?
    • 代码实现
      • Task.h头文件
      • main.cpp
    • 运行截图
    • 总结
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档