前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C++生产者与消费者多线程样例

C++生产者与消费者多线程样例

作者头像
用户5908113
发布2021-01-06 11:39:13
7440
发布2021-01-06 11:39:13
举报
文章被收录于专栏:Pou光明Pou光明

先了解问题背景:

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用线程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

问题描述:生产者不断生产数据,每包数据有优先级及时间戳等属性,当队列满时,移除时间最迟的数据,并将新数据放置队列头。

通过锁与条件变量进行线程同步,下面通过代码进行说明。

一、数据结构

代码语言:javascript
复制
typedef struct stc_RcvInfo{    int32_t priority;         //优先级属性    int64_t u64InTime;       //进入队列时间(us)    int32_t nBuffLen;        //数据长度    int8_t acBuff[DATA_LEN]; //数据缓存} tRcvInfo;

二、队列函数设计——入队、出队、锁、条件变量

1、 数据结构

代码语言:javascript
复制
typedef int PRIORITY;   //0 is highest leveltypedef std::vector<std::shared_ptr<tRcvInfo>> MSG_QUEUE;  //存放数据的容器
static std::mutex mtx;                                  //本队列使用的锁static std::condition_variable cv;                 //本队列与锁对应的条件变//数据优先级与Vector关联的MAPstatic std::map<PRIORITY, MSG_QUEUE> PriorityQueue{      {0, std::vector<std::shared_ptr<tRcvInfo>>{}},    {1, std::vector<std::shared_ptr<tRcvInfo>>{}},    {2, std::vector<std::shared_ptr<tRcvInfo>>{}},    {3, std::vector<std::shared_ptr<tRcvInfo>>{}},    {4, std::vector<std::shared_ptr<tRcvInfo>>{}},    {5, std::vector<std::shared_ptr<tRcvInfo>>{}},    {6, std::vector<std::shared_ptr<tRcvInfo>>{}},    {7, std::vector<std::shared_ptr<tRcvInfo>>{}}};
static int indexArray[] = {0, 1, 2, 3, 4, 5, 6, 7};static int ArraySize = sizeof(indexArray) / sizeof(indexArray[0]);

2、 向队列中以一定规则插入数据

代码语言:javascript
复制
int PushDataIntoCommunicateQueue(std::shared_ptr<tRcvInfo> pdata){    {        std::unique_lock<std::mutex> lk(mtx);                  //上锁        try        {//判断所在优先级对应的队列是否已满,队列大小通过宏来控制            if (PriorityQueue.at(pdata->priority).size() < COMMUNICATE_QUEUE_SIZE)  //未满            {                PriorityQueue.at(pdata->priority).push_back(pdata);               //按顺序入队            }            else          //已满,删除第一个元素,即时间戳最晚元素 ,将数据放入第一个位置            {                PriorityQueue.at(pdata->priority).erase(PriorityQueue.at(pdata->priority).begin());                PriorityQueue.at(pdata->priority).insert(PriorityQueue.at(pdata->priority).begin(), pdata);            }
            int nSta, nSize;            nSize = PriorityQueue.at(pdata->priority).size();            nSta = nSize >= COMMUNICATE_QUEUE_SIZE ? 1 : 0;            // if (nSta == 1)                                       //输出当前队列大小            // {            //     printf("Queue nSta = %d  nSize = %d\n", nSta, nSize);            //     std::cout << "thread id=" << std::this_thread::get_id() << std::endl;            //}        }            catch (const std::exception &e)        {            std::cerr << e.what() << std::endl;            return -1;        }    }    cv.notify_all();                          //完成操作,进行唤醒
    return 0;}

3、 从队列中取数据

代码语言:javascript
复制
std::shared_ptr<tRcvInfo> GetDataFromCommunicateQueue(){    std::unique_lock<std::mutex> lk(mtx);               //上锁//条件变量等待固定时间或谓词为真    cv.wait_for(lk, std::chrono::microseconds(5), []() {        for (auto &it : PriorityQueue)        {            if (!it.second.empty())            {                return true;            }        }        return false;    });
    std::shared_ptr<tRcvInfo> pdata = NULL;
    for (int i = 0; i < ArraySize; i++)    {        if (!PriorityQueue.at(indexArray[i]).empty())        {            pdata = PriorityQueue.at(indexArray[i]).front();    //从对应优先级中取出数据            PriorityQueue.at(indexArray[i]).erase(PriorityQueue.at(indexArray[i]).begin());  //            break;        }    }
    return pdata;}

三、函数调用

两个生产者,一个消费者。

1、生产者

代码语言:javascript
复制
static void *ProducerOne(void *pArg){    uint8_t u8testData[512] = {0};    memset(u8testData, 0x66, sizeof(u8testData));     
    int mark = *((int *)pArg);    printf("I am is %d thread child \n ", mark);
    while (1)    {        std::shared_ptr<tRcvInfo> pRcvInfo = std::make_shared<tRcvInfo>();        if (NULL == pRcvInfo)        {            printf("pRcvInfo calloc error!\n");            break;        }
        pRcvInfo->priority = 0;                                    //构造数据        memcpy(pRcvInfo->acBuff, u8testData, sizeof(u8testData));        pRcvInfo->nBuffLen = sizeof(u8testData);        pRcvInfo->u64InTime = current_time_us_get();
        PushDataIntoCommunicateQueue(pRcvInfo);                 //将数据入队,且无需延时    }
    (void)pthread_exit(NULL);}

2、消费者

代码语言:javascript
复制
static void *Consumer(void *pArg){    int mark = *((int *)pArg);    printf("I am is %d thread child \n ", mark);
    while (1)    {        std::shared_ptr<tRcvInfo> pRcvInfo = GetDataFromCommunicateQueue();    //获取数据,无需延时        if (pRcvInfo == NULL)        {            continue;        }
        printf("priority : %d \n", pRcvInfo->priority);    }
    (void)pthread_exit(NULL);}

四、小结

问题点一个是线程间同步问题——使用锁和条件变量

还有就是对谓词判断上——延时时间到,或者队列不为空谓词为真,程序皆可向下执行,跳出阻塞。

cpu 占用率

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Pou光明 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档