我对无锁编程世界非常陌生,希望能得到一些关于这段代码的反馈。我一直在用多个生产者和消费者测试队列,并且没有意外的输出(目前为止)。有什么我可能错过或者应该改进的地方吗?
template <typename Type, std::size_t Capacity = 100>
struct concurrent_queue
{
static_assert(Capacity > 2, "not enough room in queue!");
bool try_push(Type const & object) const
{
auto past_writer = writer.load();
auto next_writer = increment(past_writer);
if (next_writer != reader.load())
{
if (writer.compare_exchange_weak(past_writer, next_writer))
{
array[past_writer] = object;
return true;
}
}
return false;
}
bool try_push(Type & object) const
{
auto past_writer = writer.load();
auto next_writer = increment(past_writer);
if (next_writer != reader.load())
{
if (writer.compare_exchange_weak(past_writer, next_writer))
{
array[past_writer] = std::move_if_noexcept(object);
return true;
}
}
return false;
}
bool try_pop(Type & object) const
{
auto past_reader = reader.load();
auto next_reader = increment(past_reader);
if (past_reader != writer.load())
{
if (reader.compare_exchange_weak(past_reader, next_reader))
{
object = std::move_if_noexcept(array[past_reader]);
return true;
}
}
return false;
}
bool empty() const
{
return reader.load() == writer.load();
}
private:
std::size_t increment(std::size_t index) const
{
return (index + 1) % Capacity;
}
mutable std::atomic_size_t reader { 0 };
mutable std::atomic_size_t writer { 0 };
mutable std::array<Type, Capacity> array;
};
发布于 2015-10-12 01:03:45
欢迎来到代码评审。这个队列实现并不是真正的并发就绪。
template <typename Type, std::size_t Capacity = 100>
为什么您决定默认大小应该是100?不同的人有不同的需求;拥有这样的默认值并不是一个好主意,因为默认大小为100并没有真正的优势。
虽然这是主观的,但我建议您删除默认大小,只要求用户指定他们想要的大小。
在使用比较/交换方面似乎存在误解。比较/交换操作的工作如下:
这些操作具有以下签名:
X.compare_exchange(E, N);
这大致将(但不是比较/交换操作的实现)转换为:
bool compare_exchange(E, N)
{
if( X == E )
{
X = N;
return true;
}
else
{
E = X;
return false;
}
}
这意味着,当比较失败时,E被更新为X的当前值。如果比较成功,X将被更新为N的值,因此,您不需要再次更新X!这就是比较/交换操作的全部要点;您基本上是在说:“我是最后一个修改X的人。”
的不同步访问
在代码中,使用上次从比较/交换操作中执行的更新获得的值的副本对原子变量执行存储,但这不是原子性的。
if (next_writer != reader.load())
{
if (writer.compare_exchange_weak(past_writer, next_writer))
{
// thread can be stalled here and try_pop() will access garbage memory
// since at this point writer != reader
array[past_writer] = object;
return true;
}
}
假设在线程A中对try_push()
的第一次调用是在if (next_writer != reader.load())
语句中进行的,并且在执行array[past_writer] = object;
之前就停止了。现在,另一个线程B调用try_pop()
并成功地通过了if (past_reader != writer.load())
;由于比较/交换操作,写入程序现在的值与读取器不同。
但是,由于线程A仍然处于停滞状态,元素的存储还没有发生。B线程会弹出什么?它将访问那个位置的任何垃圾内存。
来自reader
的负载和从writer
加载后的比较都不是线程安全的。
bool empty() const
{
return reader.load() == writer.load();
}
考虑一下如果writer.load()
返回值A会发生什么,但是在计算表达式之前,writer
被更改为其他值B。这可能导致empty()
说队列不是空的,或者当它实际上是空的时候。
这是当前代码中存在的许多问题中的两个。我决定在这里停下来,因为这是个很严重的问题。这个当前的实现,如果根本不安全的话。我建议您更多地阅读和学习线程交互。祝好运。
https://codereview.stackexchange.com/questions/107254
复制相似问题