大家好,我是光城,在内容之前,举一个非常有趣的例子,如何实现一个线程安全的多生产多消费者对列呢?
如果让你手撕一个,可以写出来吗?里面有哪些细节?如何使用condition_variable?
完整代码如下:获取方式见文末。
引入之前,我们需要先写一个线程安全的队列,然后才能写出一个多生产多消费。
所以,第一个部分先写一个线程安全的队列,不考虑多生产多消费者情况。
通常,大家可能会想到很简单,就对STL的queue加个锁就完事吧?
如果是这么简单,我就不必赘述这么多了,之前在面试的时候也遇到过这么一个问题:面试官问我,如何保证empty线程安全,如何保证队列线程安全?等等,这不就是这里的问题嘛,如何写一个线程安全的队列?
我们先写出一个demo版,这个版本有问题并且通常是大家所熟知的写法,例如:
template <typename T>
class concurrent_queue {
private:
std::queue<T> q_;
mutable std::mutex m_;
public:
void push(const T& data) {
std::lock_guard<std::mutex> lock(m_);
q_.push(data);
}
bool empty() const {
std::lock_guard<std::mutex> lock(m_);
return q_.empty();
}
T& front() {
std::lock_guard<std::mutex> lock(m_);
return q_.front();
}
T const& front() const {
std::lock_guard<std::mutex> lock(m_);
return q_.front();
}
void pop() {
std::lock_guard<std::mutex> lock(m_);
q_.pop();
}
};
是不是看上去很easy,这里究竟存在什么问题呢?
我们来想象一个场景,如果有两个线程A、B,他们分别做了如下事情:
thread A内部逻辑是判断不为空,提取元素;thread B直接pop。
// thread A
while(!q.empty()) {
// do something
q.front();
}
// thread B
q.pop();
多线程场景下可能会存在thread A首先拿到锁然后判断不为空,此时队列非空,进入了while 内部,然后执行do something,此时锁被thread B持有了,B执行了pop操作,然后又回到A的front,此时A不知道B已经pop了元素,此时拿到的就有问题了。
那如何实现呢?
其实很简单,例如pop可以改为内部判断一下是否为空即可,如果为空,返回false,否则给外部的变量设置front的值即可。
bool try_front(T& fronted_value) {
std::unique_lock<std::mutex> lock(m_);
if (q_.empty()) {
return false;
}
fronted_value = q_.front();
return true;
}
此时,我们的queue的front、pop都是线程安全的,但是问题又来了,如何实现多生产多消费呢?
这里便引出两个问题:
所以我们需要在执行push的时候以某种方式通知消费者线程,这便解决了第一个问题,而第二个问题类似,需要进行等待。
到这里我们想到了cv,于是代码可以改写为:
std::queue<T> q_;
mutable std::mutex m_;
std::condition_variable cv_;
在push的时候,需要notify一下即可。
std::unique_lock<std::mutex> lock(m_);
q_.push(data);
lock.unlock();
cv_.notify_one();
对于生产者,在消费的时候,例如使用了pop即可,那么需要进行等待:
while (q_.empty()) {
cv_.wait(lock);
}
至此,我们便得到了一个线程安全且支持多生产多消费的队列!