我正在使用BlockingCollection
来实现生产者/消费者模式。我有一个异步循环,用要处理的数据填充集合,然后客户端可以在很长一段时间内访问这些数据。数据包很少到达,我希望在不使用阻塞调用的情况下完成轮询。
本质上,我正在寻找阻塞集合中不存在的BeginTake
和EndTake
之类的东西,这样我就可以在回调中利用内部线程池。它无论如何都不一定要是一个BlockingCollection
。任何能满足我需要的东西都会很棒。
这就是我现在得到的。_bufferedPackets
是一个BlockingCollection<byte[]>
public byte[] Read(int timeout)
{
byte[] result;
if (_bufferedPackets.IsCompleted)
{
throw new Exception("Out of packets");
}
_bufferedPackets.TryTake(out result, timeout);
return result;
}
我希望这是这样的,用伪代码:
public void Read(int timeout)
{
_bufferedPackets.BeginTake(result =>
{
var bytes = _bufferedPackets.EndTake(result);
// Process the bytes, or the resuting timeout
}, timeout, _bufferedPackets);
}
我对此有什么选择?我不想让的任何线程处于等待状态,因为有很多其他IO需要它处理,而且我很快就会用完所有线程。
更新:我已经重写了有问题的代码,以不同的方式使用异步进程,本质上是根据在超时限制内是否有等待请求来交换回调。这可以很好地工作,但如果有一种方法可以做到这一点,而不是求助于计时器和交换lambda,这可能会导致竞争条件,并且很难编写(和理解),那就太棒了。我已经用自己的异步队列实现解决了这个问题,但如果有一个更标准的和经过良好测试的选项,那就太棒了。
发布于 2012-08-25 03:32:50
所以看起来没有一个内置的选项,我走了出去,试图尽我最大的努力来做我想要的实验。事实证明,为了让这种模式大致像旧异步模式的其他用户一样工作,需要做很多繁琐的工作。
public class AsyncQueue<T>
{
private readonly ConcurrentQueue<T> queue;
private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue;
private class DequeueAsyncResult : IAsyncResult
{
public bool IsCompleted { get; set; }
public WaitHandle AsyncWaitHandle { get; set; }
public object AsyncState { get; set; }
public bool CompletedSynchronously { get; set; }
public T Result { get; set; }
public AsyncCallback Callback { get; set; }
}
public AsyncQueue()
{
dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
queue = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
DequeueAsyncResult asyncResult;
while (dequeueQueue.TryDequeue(out asyncResult))
{
if (!asyncResult.IsCompleted)
{
asyncResult.IsCompleted = true;
asyncResult.Result = item;
ThreadPool.QueueUserWorkItem(state =>
{
if (asyncResult.Callback != null)
{
asyncResult.Callback(asyncResult);
}
else
{
((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
}
});
return;
}
}
queue.Enqueue(item);
}
public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
{
T result;
if (queue.TryDequeue(out result))
{
var dequeueAsyncResult = new DequeueAsyncResult
{
IsCompleted = true,
AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset),
AsyncState = state,
CompletedSynchronously = true,
Result = result
};
if (null != callback)
{
callback(dequeueAsyncResult);
}
return dequeueAsyncResult;
}
var pendingResult = new DequeueAsyncResult
{
AsyncState = state,
IsCompleted = false,
AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset),
CompletedSynchronously = false,
Callback = callback
};
dequeueQueue.Enqueue(pendingResult);
Timer t = null;
t = new Timer(_ =>
{
if (!pendingResult.IsCompleted)
{
pendingResult.IsCompleted = true;
if (null != callback)
{
callback(pendingResult);
}
else
{
((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
}
}
t.Dispose();
}, new object(), timeout, Timeout.Infinite);
return pendingResult;
}
public T EndDequeue(IAsyncResult result)
{
var dequeueResult = (DequeueAsyncResult) result;
return dequeueResult.Result;
}
}
我不太确定IsComplete
属性的同步性,也不太清楚dequeueQueue
是如何在随后的Enqueue
调用中被清除的。我也不确定什么时候是发出等待句柄信号的正确时间,但这是迄今为止我得到的最好的解决方案。
请不要以任何方式考虑此产品质量代码。我只是想展示一下我是如何在没有等待锁的情况下保持所有线程旋转的。我确信这充满了各种各样的边缘案例和错误,但它满足了需求,我想给遇到这个问题的人一些回馈。
发布于 2012-08-24 14:17:01
我可能误解了您的情况,但是您不能使用非阻塞集合吗?
我创建这个例子是为了说明:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncTakeFromBlockingCollection
{
class Program
{
static void Main(string[] args)
{
var queue = new ConcurrentQueue<string>();
var producer1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i += 1)
{
queue.Enqueue("=======");
Thread.Sleep(10);
}
});
var producer2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i += 1)
{
queue.Enqueue("*******");
Thread.Sleep(3);
}
});
CreateConsumerTask("One ", 3, queue);
CreateConsumerTask("Two ", 4, queue);
CreateConsumerTask("Three", 7, queue);
producer1.Wait();
producer2.Wait();
Console.WriteLine(" Producers Finished");
Console.ReadLine();
}
static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
{
Task.Factory.StartNew(() =>
{
while (true)
{
string result;
if (queue.TryDequeue(out result))
{
Console.WriteLine(" {0} consumed {1}", taskName, result);
}
Thread.Sleep(sleepTime);
}
});
}
}
}
下面是程序的输出
我相信BlockingCollection的目的是包装一个并发的集合,并提供一种机制,允许多个消费者阻塞;等待生产者。这种用法似乎与您的要求相反。
发布于 2012-08-30 03:28:30
我很确定BlockingCollection<T>
做不到这一点,你必须自己动手。我想出了这个:
class NotifyingCollection<T>
{
private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>();
private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>();
private object _lock = new object();
public void Add(T item)
{
_overflow.Enqueue(item);
Dispatch();
}
private void Dispatch()
{
// this lock is needed since we need to atomically dequeue from both queues...
lock (_lock)
{
while (_overflow.Count > 0 && _subscribers.Count > 0)
{
Action<T> callback;
T item;
var r1 = _overflow.TryDequeue(out item);
var r2 = _subscribers.TryDequeue(out callback);
Debug.Assert(r1 && r2);
callback(item);
// or, optionally so that the caller thread's doesn't take too long ...
Task.Factory.StartNew(() => callback(item));
// but you'll have to consider how exceptions will be handled.
}
}
}
public void TakeAsync(Action<T> callback)
{
_subscribers.Enqueue(callback);
Dispatch();
}
}
我使用调用TakeAsync()
或Add()
的线程作为回调线程。当您调用Add()
或TakeAsync()
时,它将尝试将所有排队的项分派给排队的回调函数。这样就不会创建线程,它们只是在那里休眠,等待信号。
这个锁有点难看,但是你可以在没有锁的情况下在多个线程上排队和订阅。我想不出一种方法,如果在不使用锁的情况下,另一个队列上有可用的东西,那么就只能将一个队列出队。
注意:我只用了几个线程对其进行了最低限度的测试。
https://stackoverflow.com/questions/11949424
复制相似问题