我有一个MessagesManager线程,不同的线程可以向它发送消息,然后这个MessagesManager线程负责在SendMessageToTcpIP() ( MessagesManager线程的起点)中发布这些消息。
class MessagesManager : IMessageNotifier
{
//private
private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>();
public void PublishMessage(string Message)
{
MessagesQueue.Enqueue(Message);
_waitTillMessageQueueEmptyARE.Set();
}
public void SendMessageToTcpIP()
{
//keep waiting till a new message comes
while (MessagesQueue.Count() == 0)
{
_waitTillMessageQueueEmptyARE.WaitOne();
}
//Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
Queue<string> localMessagesQueue = new Queue<string>();
while (!MessagesQueue.IsEmpty)
{
string message;
bool isRemoved = MessagesQueue.TryDequeue(out message);
if (isRemoved)
localMessagesQueue.Enqueue(message);
}
//Use the Local Queue for further processing
while (localMessagesQueue.Count() != 0)
{
TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
Thread.Sleep(2000);
}
}
}不同的线程(3-4)通过调用PublishMessage(string Message) (对MessageManager使用相同的对象)发送消息。消息出现后,我将该消息推入并发队列,并通过设置_waitTillMessageQueueEmptyARE.Set();通知SendMessageToTcpIP()。SendMessageToTcpIP()**,中的,我是从本地队列中的并发队列复制消息,然后逐个发布。
问题:以这种方式进行排队和脱队列是线程安全吗?会不会有什么奇怪的影响呢?
发布于 2017-03-30 12:30:26
虽然这可能是线程安全的,但是在.NET中有内置类来帮助开发“许多发布者一个消费者”的模式,比如BlockingCollection。你可以这样重写你的班级:
class MessagesManager : IDisposable {
// note that your ConcurrentQueue is still in play, passed to constructor
private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());
public MessagesManager() {
// start consumer thread here
new Thread(SendLoop) {
IsBackground = true
}.Start();
}
public void PublishMessage(string Message) {
// no need to notify here, will be done for you
MessagesQueue.Add(Message);
}
private void SendLoop() {
// this blocks until new items are available
foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
// ensure that you handle exceptions here, or whole thing will break on exception
TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
Thread.Sleep(2000); // only if you are sure this is required
}
}
public void Dispose() {
// this will "complete" GetConsumingEnumerable, so your thread will complete
MessagesQueue.CompleteAdding();
MessagesQueue.Dispose();
}
}发布于 2017-03-30 12:43:27
.NET已经提供了允许将消息发送到缓冲区并异步处理消息的ActionBlock< T>。默认情况下,一次只处理一条消息。
您的代码可以重写为:
//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
await Task.Delay(2000);
);
//In some other thread ...
foreach ( ....)
{
_hmiAgent.Post(someMessage);
}
// When the application closes
_hmiAgent.Complete();
await _hmiAgent.Completion;ActionBlock提供了许多好处--您可以在缓冲区中指定它可以接受的项的数量限制,并指定可以并行处理多条消息。还可以在处理管道中组合多个块。在桌面应用程序中,可以将消息发布到管道以响应事件,通过单独的块处理消息,并将结果发布到更新UI的最终块中。
例如,填充可以由中间TransformBlock< TIn,TOut>执行。这种转换是微不足道的,使用块的成本要比方法大,但这只是一个例子:
//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
msg=>msg.PadRight(80, ' '));
ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
TcpIpMessageSenderClient.ConnectAndSendMessage());
await Task.Delay(2000);
);
var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);投递代码一点也不改变
_hmiAgent.Post(someMessage);当应用程序终止时,我们需要等待_tcpBlock完成:
_hmiAgent.Complete();
await _tcpBlock.Completion;Visual 2015+本身对此类场景使用TPL数据流。
Bar Arnon在TPL数据流是您不使用的最佳库中提供了一个更好的示例,展示了如何在块中使用同步和异步方法。
发布于 2017-03-30 12:29:52
这是我如何实现此功能的重构代码片段:
class MessagesManager {
private readonly AutoResetEvent messagesAvailableSignal = new AutoResetEvent(false);
private readonly ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();
public void PublishMessage(string Message) {
messageQueue.Enqueue(Message);
messagesAvailableSignal.Set();
}
public void SendMessageToTcpIP() {
while (true) {
messagesAvailableSignal.WaitOne();
while (!messageQueue.IsEmpty) {
string message;
if (messageQueue.TryDequeue(out message)) {
TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' '));
}
}
}
}
}请注意:
https://stackoverflow.com/questions/43117502
复制相似问题