我有一个必须尽快处理的数据流。单个流包含多达200个来源的数据。并不是所有的源都产生相同的数据量,并且速率可能会有所不同。
作为最初的尝试,我决定创建10个(某种程度上基于服务器规格,双四核),长时间运行的任务。每个任务将从一个BlockCollection中读取。在开始之前,我创建了一个映射,以便在入站数据流上接收到数据时,我知道要将该源数据添加到哪个BlockingCollection。
问题是,我认为,我不知道哪个来源会产生最多的数据,事实上,随着时间的推移,这一点可能会改变。我看到一些集合非常空,而另一些则收到了更多的更新。
如果我有8个可用的硬件线程,并且我已经创建了大约10个队列,并且任务没有绑定到线程(同样不确定TaskCreationOptions.LongRunning是否如此),那么即使一个队列不忙,另一个忙队列也不能利用空闲线程,因为从理论上讲,我可能会无序地处理一段数据。
我只是为每个源创建一个Task并阻塞集合,这样TPL就可以最大限度地利用可用的线程,因为数据处于最隔离状态,这是否更好?
我的另一个选择是以某种方式研究过去的统计数据和各种外部/人工信息,如何最好地在有限的BlockingCollections/任务集中传播源,然后随着时间的推移调整映射。
我希望我已经足够好地解释了我的场景。
我使用了一个封装BlockingCollection and Task的类
我有什么可以可视化为40+流交错,如果拆分可以同时处理(只要每个流保持在它自己的顺序),但有更多的流比可用的硬件线程。
编辑-尝试澄清我的问题
试着弄清楚我在找什么。我目前正在有效地将源代码拆分成子组,并为每个组分配自己的队列。我的问题是:要创建多少个组?如果我有200个源代码,我应该创建200个组(然后是200个任务和阻塞集合),然后让TPL像个疯人一样到处跑,尽可能地分配线程,因为每个任务都有自己的cpu时间。或者我最好为每个底层硬件线程分配一个组?
发布于 2012-03-06 06:03:14
如果您为每个源创建了一个Task和一个队列,那么只要该Task在队列为空时完成,就应该可以工作。您将肯定地知道,在一个源中的项目的排序是保持的,并且在必要时,您应该有充分的CPU利用率。但是,如果所有线程当前都在处理,并且新数据来自低频源,则可能需要等待很长时间才能处理该数据。
如果这对您来说是一个问题,您应该自己管理处理的确切顺序,而不是依赖于Task。要做到这一点,您可以为每个源分别使用一个全局队列和一个本地队列。为了维护顺序,全局队列或当前处理中最多只能有一个数据项。当完成项的处理时,如果可能,项将从正确的本地队列移动到全局队列。这样,您应该获得更公平的数据处理顺序。
代码可能如下所示:
class SourcesManager<T>
{
private readonly BlockingCollection<Tuple<T, Source<T>>> m_queue =
new BlockingCollection<Tuple<T, Source<T>>>();
public Source<T> CreateSource()
{
return new Source<T>(m_queue);
}
// blocks if no items are available and Complete() hasn't been called
public bool TryProcess(Action<T> action)
{
Tuple<T, Source<T>> tuple;
if (m_queue.TryTake(out tuple, Timeout.Infinite))
{
action(tuple.Item1);
tuple.Item2.TryDequeue();
return true;
}
return false;
}
public void Complete()
{
m_queue.CompleteAdding();
}
}
class Source<T>
{
private readonly Queue<T> m_localQueue = new Queue<T>();
private readonly BlockingCollection<Tuple<T, Source<T>>> m_managerQueue;
private volatile bool m_managerHasData = false;
internal Source(BlockingCollection<Tuple<T, Source<T>>> managerQueue)
{
m_managerQueue = managerQueue;
}
public void Enqueue(T data)
{
lock (m_localQueue)
{
if (!m_managerHasData)
{
m_managerQueue.Add(Tuple.Create(data, this));
m_managerHasData = true;
}
else
m_localQueue.Enqueue(data);
}
}
internal bool TryDequeue()
{
lock (m_localQueue)
{
if (m_localQueue.Count == 0)
{
m_managerHasData = false;
return false;
}
m_managerQueue.Add(Tuple.Create(m_localQueue.Dequeue(), this));
return true;
}
}
}发布于 2012-03-08 08:14:16
我个人会在这里利用TPL Dataflow,只是定义一个表示您的工作的ActionBlock<T>,并在它的“前面”链接一个BufferBlock<T>,以防止不同的生产者过饱和。然后,您所要做的就是从各种来源(生产者)发布到BufferBlock<T>,并确保您已经对块选项(BoundedCapacity、MaxDegreeOfParallelism、MaxMessagesPerTask等)进行了负载测试/配置。因此,让TPL Dataflow发挥它的魔力。把所有的重担都从你的手中拿出来。
发布于 2012-03-06 04:08:04
我相信管道方法会对你有所帮助。
管道模式使用并行任务和并发队列来处理一系列输入值。每个任务实现流水线的一个阶段,队列充当缓冲区,允许流水线的各个阶段并发执行,即使值是按顺序处理的。您可以将软件管道视为类似于工厂中的装配线,其中装配线中的每个项目都是分阶段构建的。将部分装配的物品从一个装配阶段传递到另一个装配阶段。装配线的输出顺序与输入的顺序相同。
请参阅以下MSDN论文和示例:
https://stackoverflow.com/questions/9573183
复制相似问题