首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何平衡和并行多个顺序数据流的处理

如何平衡和并行多个顺序数据流的处理
EN

Stack Overflow用户
提问于 2012-03-06 04:04:47
回答 3查看 555关注 0票数 3

我有一个必须尽快处理的数据流。单个流包含多达200个来源的数据。并不是所有的源都产生相同的数据量,并且速率可能会有所不同。

作为最初的尝试,我决定创建10个(某种程度上基于服务器规格,双四核),长时间运行的任务。每个任务将从一个BlockCollection中读取。在开始之前,我创建了一个映射,以便在入站数据流上接收到数据时,我知道要将该源数据添加到哪个BlockingCollection。

问题是,我认为,我不知道哪个来源会产生最多的数据,事实上,随着时间的推移,这一点可能会改变。我看到一些集合非常空,而另一些则收到了更多的更新。

如果我有8个可用的硬件线程,并且我已经创建了大约10个队列,并且任务没有绑定到线程(同样不确定TaskCreationOptions.LongRunning是否如此),那么即使一个队列不忙,另一个忙队列也不能利用空闲线程,因为从理论上讲,我可能会无序地处理一段数据。

我只是为每个源创建一个Task并阻塞集合,这样TPL就可以最大限度地利用可用的线程,因为数据处于最隔离状态,这是否更好?

我的另一个选择是以某种方式研究过去的统计数据和各种外部/人工信息,如何最好地在有限的BlockingCollections/任务集中传播源,然后随着时间的推移调整映射。

我希望我已经足够好地解释了我的场景。

我使用了一个封装BlockingCollection and Task的类

我有什么可以可视化为40+流交错,如果拆分可以同时处理(只要每个流保持在它自己的顺序),但有更多的流比可用的硬件线程。

编辑-尝试澄清我的问题

试着弄清楚我在找什么。我目前正在有效地将源代码拆分成子组,并为每个组分配自己的队列。我的问题是:要创建多少个组?如果我有200个源代码,我应该创建200个组(然后是200个任务和阻塞集合),然后让TPL像个疯人一样到处跑,尽可能地分配线程,因为每个任务都有自己的cpu时间。或者我最好为每个底层硬件线程分配一个组?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2012-03-06 06:03:14

如果您为每个源创建了一个Task和一个队列,那么只要该Task在队列为空时完成,就应该可以工作。您将肯定地知道,在一个源中的项目的排序是保持的,并且在必要时,您应该有充分的CPU利用率。但是,如果所有线程当前都在处理,并且新数据来自低频源,则可能需要等待很长时间才能处理该数据。

如果这对您来说是一个问题,您应该自己管理处理的确切顺序,而不是依赖于Task。要做到这一点,您可以为每个源分别使用一个全局队列和一个本地队列。为了维护顺序,全局队列或当前处理中最多只能有一个数据项。当完成项的处理时,如果可能,项将从正确的本地队列移动到全局队列。这样,您应该获得更公平的数据处理顺序。

代码可能如下所示:

代码语言:javascript
运行
复制
class SourcesManager<T>
{
    private readonly BlockingCollection<Tuple<T, Source<T>>> m_queue =
        new BlockingCollection<Tuple<T, Source<T>>>();

    public Source<T> CreateSource()
    {
        return new Source<T>(m_queue);
    }

    // blocks if no items are available and Complete() hasn't been called
    public bool TryProcess(Action<T> action)
    {
        Tuple<T, Source<T>> tuple;
        if (m_queue.TryTake(out tuple, Timeout.Infinite))
        {
            action(tuple.Item1);
            tuple.Item2.TryDequeue();
            return true;
        }

        return false;
    }

    public void Complete()
    {
        m_queue.CompleteAdding();
    }
}

class Source<T>
{
    private readonly Queue<T> m_localQueue = new Queue<T>();
    private readonly BlockingCollection<Tuple<T, Source<T>>> m_managerQueue;
    private volatile bool m_managerHasData = false;

    internal Source(BlockingCollection<Tuple<T, Source<T>>> managerQueue)
    {
        m_managerQueue = managerQueue;
    }

    public void Enqueue(T data)
    {
        lock (m_localQueue)
        {
            if (!m_managerHasData)
            {
                m_managerQueue.Add(Tuple.Create(data, this));
                m_managerHasData = true;
            }
            else
                m_localQueue.Enqueue(data);
        }
    }

    internal bool TryDequeue()
    {
        lock (m_localQueue)
        {
            if (m_localQueue.Count == 0)
            {
                m_managerHasData = false;
                return false;
            }

            m_managerQueue.Add(Tuple.Create(m_localQueue.Dequeue(), this));
            return true;
        }
    }
}
票数 0
EN

Stack Overflow用户

发布于 2012-03-08 08:14:16

我个人会在这里利用TPL Dataflow,只是定义一个表示您的工作的ActionBlock<T>,并在它的“前面”链接一个BufferBlock<T>,以防止不同的生产者过饱和。然后,您所要做的就是从各种来源(生产者)发布到BufferBlock<T>,并确保您已经对块选项(BoundedCapacityMaxDegreeOfParallelismMaxMessagesPerTask等)进行了负载测试/配置。因此,让TPL Dataflow发挥它的魔力。把所有的重担都从你的手中拿出来。

票数 1
EN

Stack Overflow用户

发布于 2012-03-06 04:08:04

我相信管道方法会对你有所帮助。

管道模式使用并行任务和并发队列来处理一系列输入值。每个任务实现流水线的一个阶段,队列充当缓冲区,允许流水线的各个阶段并发执行,即使值是按顺序处理的。您可以将软件管道视为类似于工厂中的装配线,其中装配线中的每个项目都是分阶段构建的。将部分装配的物品从一个装配阶段传递到另一个装配阶段。装配线的输出顺序与输入的顺序相同。

请参阅以下MSDN论文和示例:

  • Pipelines
  • How to: Use Arrays of Blocking Collections in a Pipeline
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/9573183

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档