先了解问题背景:
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用线程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
问题描述:生产者不断生产数据,每包数据有优先级及时间戳等属性,当队列满时,移除时间最迟的数据,并将新数据放置队列头。
通过锁与条件变量进行线程同步,下面通过代码进行说明。
一、数据结构
typedef struct stc_RcvInfo{ int32_t priority; //优先级属性 int64_t u64InTime; //进入队列时间(us) int32_t nBuffLen; //数据长度 int8_t acBuff[DATA_LEN]; //数据缓存} tRcvInfo;
二、队列函数设计——入队、出队、锁、条件变量
1、 数据结构
typedef int PRIORITY; //0 is highest leveltypedef std::vector<std::shared_ptr<tRcvInfo>> MSG_QUEUE; //存放数据的容器
static std::mutex mtx; //本队列使用的锁static std::condition_variable cv; //本队列与锁对应的条件变//数据优先级与Vector关联的MAPstatic std::map<PRIORITY, MSG_QUEUE> PriorityQueue{ {0, std::vector<std::shared_ptr<tRcvInfo>>{}}, {1, std::vector<std::shared_ptr<tRcvInfo>>{}}, {2, std::vector<std::shared_ptr<tRcvInfo>>{}}, {3, std::vector<std::shared_ptr<tRcvInfo>>{}}, {4, std::vector<std::shared_ptr<tRcvInfo>>{}}, {5, std::vector<std::shared_ptr<tRcvInfo>>{}}, {6, std::vector<std::shared_ptr<tRcvInfo>>{}}, {7, std::vector<std::shared_ptr<tRcvInfo>>{}}};
static int indexArray[] = {0, 1, 2, 3, 4, 5, 6, 7};static int ArraySize = sizeof(indexArray) / sizeof(indexArray[0]);
2、 向队列中以一定规则插入数据
int PushDataIntoCommunicateQueue(std::shared_ptr<tRcvInfo> pdata){ { std::unique_lock<std::mutex> lk(mtx); //上锁 try {//判断所在优先级对应的队列是否已满,队列大小通过宏来控制 if (PriorityQueue.at(pdata->priority).size() < COMMUNICATE_QUEUE_SIZE) //未满 { PriorityQueue.at(pdata->priority).push_back(pdata); //按顺序入队 } else //已满,删除第一个元素,即时间戳最晚元素 ,将数据放入第一个位置 { PriorityQueue.at(pdata->priority).erase(PriorityQueue.at(pdata->priority).begin()); PriorityQueue.at(pdata->priority).insert(PriorityQueue.at(pdata->priority).begin(), pdata); }
int nSta, nSize; nSize = PriorityQueue.at(pdata->priority).size(); nSta = nSize >= COMMUNICATE_QUEUE_SIZE ? 1 : 0; // if (nSta == 1) //输出当前队列大小 // { // printf("Queue nSta = %d nSize = %d\n", nSta, nSize); // std::cout << "thread id=" << std::this_thread::get_id() << std::endl; //} } catch (const std::exception &e) { std::cerr << e.what() << std::endl; return -1; } } cv.notify_all(); //完成操作,进行唤醒
return 0;}
3、 从队列中取数据
std::shared_ptr<tRcvInfo> GetDataFromCommunicateQueue(){ std::unique_lock<std::mutex> lk(mtx); //上锁//条件变量等待固定时间或谓词为真 cv.wait_for(lk, std::chrono::microseconds(5), []() { for (auto &it : PriorityQueue) { if (!it.second.empty()) { return true; } } return false; });
std::shared_ptr<tRcvInfo> pdata = NULL;
for (int i = 0; i < ArraySize; i++) { if (!PriorityQueue.at(indexArray[i]).empty()) { pdata = PriorityQueue.at(indexArray[i]).front(); //从对应优先级中取出数据 PriorityQueue.at(indexArray[i]).erase(PriorityQueue.at(indexArray[i]).begin()); // break; } }
return pdata;}
三、函数调用
两个生产者,一个消费者。
1、生产者
static void *ProducerOne(void *pArg){ uint8_t u8testData[512] = {0}; memset(u8testData, 0x66, sizeof(u8testData));
int mark = *((int *)pArg); printf("I am is %d thread child \n ", mark);
while (1) { std::shared_ptr<tRcvInfo> pRcvInfo = std::make_shared<tRcvInfo>(); if (NULL == pRcvInfo) { printf("pRcvInfo calloc error!\n"); break; }
pRcvInfo->priority = 0; //构造数据 memcpy(pRcvInfo->acBuff, u8testData, sizeof(u8testData)); pRcvInfo->nBuffLen = sizeof(u8testData); pRcvInfo->u64InTime = current_time_us_get();
PushDataIntoCommunicateQueue(pRcvInfo); //将数据入队,且无需延时 }
(void)pthread_exit(NULL);}
2、消费者
static void *Consumer(void *pArg){ int mark = *((int *)pArg); printf("I am is %d thread child \n ", mark);
while (1) { std::shared_ptr<tRcvInfo> pRcvInfo = GetDataFromCommunicateQueue(); //获取数据,无需延时 if (pRcvInfo == NULL) { continue; }
printf("priority : %d \n", pRcvInfo->priority); }
(void)pthread_exit(NULL);}
四、小结
问题点一个是线程间同步问题——使用锁和条件变量
还有就是对谓词判断上——延时时间到,或者队列不为空谓词为真,程序皆可向下执行,跳出阻塞。
cpu 占用率