有一段时间,我一直在为C#中的委托寻找一个无锁、简单和可伸缩的多生产者、单一消费者队列的实现。我想我终于有了。我对它进行了基本测试,证明它是有效的,而且设计非常简单,以至于我设法说服自己它是坚如磐石的。
这依赖于比较和交换方法来更新队列,类似于C# 4.0 (请看这里)中用于生成事件字段访问器的新的无锁模式,并结合Interlocked.Exchange读取并将队列设置为null。
本质上,这源于这样的认识:消息队列实际上是一次发送的多播委托,在消息执行后重置它们的调用列表!
然而,并行代码是很难得到正确的,所以我想确认这个模式确实是正确的。我发现我的直觉可能会令人惊讶的误导,当涉及到并行,总是有一些疯狂的边缘情况把我赶走…
因此,对于这个问题:有人能向我确认下面的消息队列设计模式是线程安全的吗?
public class MessageQueue
{
Action queue;
public void Enqueue(Action message)
{
Action currentQueue;
var previousQueue = queue;
do
{
currentQueue = previousQueue;
var newQueue = currentQueue + message;
previousQueue = Interlocked.CompareExchange(ref queue, newQueue, currentQueue);
}
while (previousQueue != currentQueue);
}
public void Process()
{
var current = Interlocked.Exchange(ref queue, null);
if (current != null)
{
current();
}
}
}
发布于 2014-08-13 12:33:57
就线程安全性而言,您的代码是可以的。但有几件事我想指出:
您的变量(previousQueue
、newQueue
和currentQueue
)的含义不太清楚,至少对我来说并非如此。在编写多线程代码时,可读性变得非常重要。
此外,对于CAS-循环,我总是发现一个“而(真)-断”循环要容易得多,但这是我个人的看法。
以下是我关于提高可读性的建议:
while(true)
{
var expectedOldQueue = queue;
var newQueue = expectedOldQueue + message;
var actualOldQueue = Interlocked.CompareExchange(ref queue, newQueue, expectedOldQueue);
if(expectedOldQueue == actualOldQueue)
break;
}
另外,我有点担心你选择的设计来实现多生产者/单一消费者队列,或者我只是不理解.
如果我正确理解,生产者将排队操作,而不是需要消费的项目,生产者只需调用Process
来触发这些操作,对吗?
所以,代替这个:
//producer
queue.EnqueueItem(item);
//consumer
var item = queue.Dequeue();
Console.WriteLine(item);
你的建议是:
//producer
queue.Enqueue(() => Console.WriteLine(item));
//consumer
queue.Process();
如果是这样的话,我担心这是因为它违背了消费者/生产者体系结构的主旋律,即消费者与生产者分离,不知道如何消费商品。
根据您的建议,生产者将负责生产工作项目,并定义如何使用它们。
发布于 2014-08-13 15:00:34
我认为合并代表是实现这一目标的一种奇怪的方式。这种方法的缺点是它使正确的异常处理变得困难。使用您的代码,每当一个已排队的操作抛出异常时,Process()
都会抛出该异常,并且在同一委托中的所有操作都不会被执行。
如果这不是您想要的,那么在仍然使用梳状委托的同时更改它将是很尴尬的(您必须使用GetInvocationList()
。
我认为一个更干净的解决方案是使用一个线程安全的集合,比如ConcurrentQueue
,甚至是来自TPL的ActionBlock
,它也将处理项目的调度消耗。
如果您想继续使用CompareExchange()
,也可以使用ImmutableQueue
。
Process()
将如何被调用?我认为,通过这种设计,消息队列的工作应该是安排执行委托,而不是执行类的用户。
这样做的一种方法是使用上面提到的ActionBlock
,但是还有其他选项(比如使用BlockingCollection
存储委托,然后有一个处理Task
,当集合为空时阻塞它)。
https://codereview.stackexchange.com/questions/59895
复制相似问题