首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无锁的多生产者单消费者消息队列

无锁的多生产者单消费者消息队列
EN

Code Review用户
提问于 2014-08-13 12:24:36
回答 2查看 3.6K关注 0票数 5

有一段时间,我一直在为C#中的委托寻找一个无锁、简单和可伸缩的多生产者、单一消费者队列的实现。我想我终于有了。我对它进行了基本测试,证明它是有效的,而且设计非常简单,以至于我设法说服自己它是坚如磐石的。

这依赖于比较和交换方法来更新队列,类似于C# 4.0 (请看这里)中用于生成事件字段访问器的新的无锁模式,并结合Interlocked.Exchange读取并将队列设置为null。

本质上,这源于这样的认识:消息队列实际上是一次发送的多播委托,在消息执行后重置它们的调用列表!

然而,并行代码是很难得到正确的,所以我想确认这个模式确实是正确的。我发现我的直觉可能会令人惊讶的误导,当涉及到并行,总是有一些疯狂的边缘情况把我赶走…

因此,对于这个问题:有人能向我确认下面的消息队列设计模式是线程安全的吗?

代码语言:javascript
运行
复制
public class MessageQueue
{
    Action queue;

    public void Enqueue(Action message)
    {
        Action currentQueue;
        var previousQueue = queue;
        do
        {
            currentQueue = previousQueue;
            var newQueue = currentQueue + message;
            previousQueue = Interlocked.CompareExchange(ref queue, newQueue, currentQueue);
        }
        while (previousQueue != currentQueue);
    }

    public void Process()
    {
        var current = Interlocked.Exchange(ref queue, null);
        if (current != null)
        {
            current();
        }
    }
}
EN

回答 2

Code Review用户

回答已采纳

发布于 2014-08-13 12:33:57

就线程安全性而言,您的代码是可以的。但有几件事我想指出:

您的变量(previousQueuenewQueuecurrentQueue)的含义不太清楚,至少对我来说并非如此。在编写多线程代码时,可读性变得非常重要。

此外,对于CAS-循环,我总是发现一个“而(真)-断”循环要容易得多,但这是我个人的看法。

以下是我关于提高可读性的建议:

代码语言:javascript
运行
复制
while(true)
{
    var expectedOldQueue = queue;
    var newQueue = expectedOldQueue + message;
    var actualOldQueue = Interlocked.CompareExchange(ref queue, newQueue, expectedOldQueue);

    if(expectedOldQueue == actualOldQueue)
        break;
}

另外,我有点担心你选择的设计来实现多生产者/单一消费者队列,或者我只是不理解.

如果我正确理解,生产者将排队操作,而不是需要消费的项目,生产者只需调用Process来触发这些操作,对吗?

所以,代替这个:

代码语言:javascript
运行
复制
//producer
queue.EnqueueItem(item);

//consumer
var item = queue.Dequeue();
Console.WriteLine(item);

你的建议是:

代码语言:javascript
运行
复制
//producer
queue.Enqueue(() => Console.WriteLine(item));

//consumer
queue.Process();

如果是这样的话,我担心这是因为它违背了消费者/生产者体系结构的主旋律,即消费者与生产者分离,不知道如何消费商品。

根据您的建议,生产者将负责生产工作项目,并定义如何使用它们。

票数 3
EN

Code Review用户

发布于 2014-08-13 15:00:34

我认为合并代表是实现这一目标的一种奇怪的方式。这种方法的缺点是它使正确的异常处理变得困难。使用您的代码,每当一个已排队的操作抛出异常时,Process()都会抛出该异常,并且在同一委托中的所有操作都不会被执行。

如果这不是您想要的,那么在仍然使用梳状委托的同时更改它将是很尴尬的(您必须使用GetInvocationList()

我认为一个更干净的解决方案是使用一个线程安全的集合,比如ConcurrentQueue,甚至是来自TPL的ActionBlock,它也将处理项目的调度消耗。

如果您想继续使用CompareExchange(),也可以使用ImmutableQueue

Process()将如何被调用?我认为,通过这种设计,消息队列的工作应该是安排执行委托,而不是执行类的用户。

这样做的一种方法是使用上面提到的ActionBlock,但是还有其他选项(比如使用BlockingCollection存储委托,然后有一个处理Task,当集合为空时阻塞它)。

票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/59895

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档