首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >C++11中无锁的多生产者/消费者队列

C++11中无锁的多生产者/消费者队列
EN

Stack Overflow用户
提问于 2014-09-07 19:08:51
回答 4查看 32.3K关注 0票数 14

我正在尝试在C++11中实现一个无锁的多生产者、多消费者队列。我这样做是为了学习练习,所以我很清楚我可以使用现有的开源实现,但我真的想找出我的代码不能工作的原因。数据存储在环形缓冲区中,显然这是一个“有界MPMC队列”。

我对它进行了建模,非常接近于我所读到的Disruptor。我注意到的一件事是,它在单个消费者和单个/多个生产者的情况下工作得非常好,只是多个消费者似乎破坏了它。

队列如下:

代码语言:javascript
运行
复制
    template <typename T>
class Queue : public IQueue<T>
{
public:
    explicit Queue( int capacity );
    ~Queue();

    bool try_push( T value );
    bool try_pop( T& value );
private:
    typedef struct
    {
        bool readable;
        T value;
    } Item;

    std::atomic<int> m_head;
    std::atomic<int> m_tail;
    int m_capacity;
    Item* m_items;
};

template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
    for( int i = 0; i < capacity; ++i )
    {
        m_items[i].readable = false;
    }
}

template <typename T>
Queue<T>::~Queue()
{
    delete[] m_items;
}

template <typename T>
bool Queue<T>::try_push( T value )
{
    while( true )
    {
        // See that there's room
        int tail = m_tail.load(std::memory_order_acquire);
        int new_tail = ( tail + 1 );
        int head = m_head.load(std::memory_order_acquire);

        if( ( new_tail - head ) >= m_capacity )
        {
            return false;
        }

        if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
        {
            // In try_pop, m_head is incremented before the reading of the value has completed,
            // so though we've acquired this slot, a consumer thread may be in the middle of reading
            tail %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( m_items[tail].readable )
            {
            }

            m_items[tail].value = value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[tail].readable = true;

            return true;
        }
    }
}

template <typename T>
bool Queue<T>::try_pop( T& value )
{
    while( true )
    {
        int head = m_head.load(std::memory_order_acquire);
        int tail = m_tail.load(std::memory_order_acquire);

        if( head == tail )
        {
            return false;
        }

        int new_head = ( head + 1 );

        if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
        {
            head %= m_capacity;

            std::atomic_thread_fence( std::memory_order_acquire );
            while( !m_items[head].readable )
            {
            }

            value = m_items[head].value;
            std::atomic_thread_fence( std::memory_order_release );
            m_items[head].readable = false;

            return true;
        }
    }
}

下面是我使用的测试:

代码语言:javascript
运行
复制
void Test( std::string name, Queue<int>& queue )
{
    const int NUM_PRODUCERS = 64;
    const int NUM_CONSUMERS = 2;
    const int NUM_ITERATIONS = 512;
    bool table[NUM_PRODUCERS*NUM_ITERATIONS];
    memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));

    std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);

    std::chrono::system_clock::time_point start, end;
    start = std::chrono::system_clock::now();

    std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
    std::atomic<int> push_count (0);

    for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
    {
        threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
                                 {
                                     int base = thread_id * NUM_ITERATIONS;

                                     for( int i = 0; i < NUM_ITERATIONS; ++i )
                                     {
                                         while( !queue.try_push( base + i ) ){};
                                         push_count.fetch_add(1);
                                     }
                                 });
    }

    for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
    {
        threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
                                         {
                                             int v;

                                             while( pop_count.load() > 0 )
                                             {
                                                 if( queue.try_pop( v ) )
                                                 {
                                                     if( table[v] )
                                                     {
                                                         std::cout << v << " already set" << std::endl;
                                                     }
                                                     table[v] = true;
                                                     pop_count.fetch_sub(1);
                                                 }
                                             }
                                         });

    }

    for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
    {
        threads[i].join();
    }

    end = std::chrono::system_clock::now();
    std::chrono::duration<double> duration = end - start;

    std::cout << name << " " << duration.count() << std::endl;

    std::atomic_thread_fence( std::memory_order_acq_rel );

    bool result = true;
    for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
    {
        if( !table[i] )
        {
            std::cout << "failed at " << i << std::endl;
            result = false;
        }
    }
    std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}

任何在正确方向上的推动都将受到极大的赞赏。我对内存栅栏非常陌生,而不是只使用互斥锁来处理所有事情,所以我可能只是从根本上误解了一些东西。

干杯J

EN

回答 4

Stack Overflow用户

发布于 2015-11-01 01:30:11

我会看一看Moody Camel的实现。

这是一个完全用C++11编写的快速、通用、无锁的C++队列,文档看起来相当不错,还有一些性能测试。

在所有其他有趣的东西中(它们无论如何都值得一读),它都包含在一个标题中,并且在简化的BSD许可下可用。把它放到你的项目中去享受吧!

票数 12
EN

Stack Overflow用户

发布于 2018-05-11 00:38:58

最简单的方法是使用循环缓冲区。也就是说,它就像是一个包含256个元素的数组,您使用uint8_t作为索引,所以当您溢出它时,它会回绕并从头开始。

你可以构建的最简单的原语是当你有一个生产者,一个消费者线程的时候。

缓冲区有两个头:

  • Write head:指向下一次写入的元素。
  • read head:指向下一次读取的元素。

生产者的操作:

  1. If write Head +1 == read head,缓冲区已满,返回buffer full错误。
  2. 将内容写入element.
  3. Insert内存屏障以同步CPU核心。

将写磁头向前移动。

在缓冲区已满的情况下,还有1个空间,但我们保留了它,以区别于缓冲区为空的情况。

消费者的操作:

如果read head == write head,缓冲区为空,则返回buffer empty错误。

  • 读取element.

  • Insert内存的内容以同步CPU核心。

  • 将读取头向前移动。

生产者拥有写磁头,消费者拥有读磁头,它们之间没有并发。此外,当操作完成时,头也会更新,这确保了使用者会留下已完成的元素,而消耗会留下完全消耗的空单元格。

每当你派生一个新线程时,在两个方向上创建2个这样的管道,你就可以与你的线程进行双向通信。

考虑到我们讨论的是锁自由,这也意味着没有线程被阻塞,当没有什么可做的时候,线程是空的,你可能想要检测到这一点,并在它发生时添加一些睡眠。

票数 3
EN

Stack Overflow用户

发布于 2018-08-30 19:16:11

这个lock free queue怎么样?

它是内存排序的无锁队列,但这需要在初始化队列时预先设置当前线程的数量。

例如:

代码语言:javascript
运行
复制
int* ret;
int max_concurrent_thread = 16;
lfqueue_t my_queue;

lfqueue_init(&my_queue, max_concurrent_thread );

/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
 while (lfqueue_enq(&my_queue, int_data) == -1) {
    printf("ENQ Full ?\n");
}

/** Wrap This scope in other threads **/
/*Dequeue*/
while  ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
    printf("DEQ EMPTY ..\n");
}

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_destroy(&my_queue);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25709548

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档