首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何并行和串行处理集合中的项

如何并行和串行处理集合中的项
EN

Stack Overflow用户
提问于 2012-09-03 12:28:08
回答 3查看 2K关注 0票数 2

我有一个集合,它有要处理的元素,最多只有四个元素可以一起处理。在运行时,所有进程一起启动,所有进程都进入等待状态。一次只处理四个元素。

问题是处理元素是随机选择的,因为所有线程都在等待资源释放。表示第一个元素可以是集合中的最后一个元素。

但是,我需要按照元素在集合中的顺序来处理元素。

请告诉我怎样才能做到这一点?

我使用的是第三方物流和C# 4.0

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2012-10-02 14:39:49

这就是我完成这项任务的方式

代码语言:javascript
运行
复制
public delegate void ProcessFinished(IParallelProcess process);
public interface IParallelProcess
{
    void Start();
    event ProcessFinished ProcessFinished;
}

public class ParallelProcessBasket : ConcurrentQueue<IParallelProcess>
{
    public void Put(IParallelProcess process)
    {
        base.Enqueue(process);
    }
    public IParallelProcess Get()
    {
        IParallelProcess process = null;
        base.TryDequeue(out process);
        return process;
    }
}
public class ParallelProcessor<T> where T : class
{
    private ParallelProcessBasket basket;
    private readonly int MAX_DEGREE_OF_PARALLELISM;
    private Action<T> action;
    public ParallelProcessor(int degreeOfParallelism, IEnumerable<IParallelProcess> processes, Action<T> action)
    {
        basket = new ParallelProcessBasket();
        this.action = action;
        processes.ToList().ForEach(
            (p) =>
            {
                basket.Enqueue(p);
                p.ProcessFinished += new ProcessFinished(p_ProcessFinished);
            });
        MAX_DEGREE_OF_PARALLELISM = degreeOfParallelism;
    }

    private void p_ProcessFinished(IParallelProcess process)
    {
        if (!basket.IsEmpty)
        {
            T element = basket.Get() as T;
            if (element != null)
            {
                Task.Factory.StartNew(() => action(element));
            }
        }
    }


    public void StartProcessing()
    {
        // take first level of iteration
        for (int cnt = 0; cnt < MAX_DEGREE_OF_PARALLELISM; cnt++)
        {
            if (!basket.IsEmpty)
            {
                T element = basket.Get() as T;
                if (element != null)
                {
                    Task.Factory.StartNew(() => action(element));
                }
            }
        }
    }
}
static void Main(string[] args)    
{
     ParallelProcessor<ParallelTask> pr = new ParallelProcessor<ParallelTask>(Environment.ProcessorCount, collection, (e) => e.Method1());
            pr.StartProcessing();
}

谢谢..

票数 0
EN

Stack Overflow用户

发布于 2012-09-03 14:25:02

对于并行,总是存在一个定义“按顺序”的意思的问题。假设您有一个包含100个项目的集合。“一次处理4个订单”(如您所请求的)可能意味着:

  1. 松散排序:使用4个线程,并按原始集合的顺序发出任务。

在这种情况下,您可以使用:

新项目po =新代码(){ MaxDegreeOfParallelism =4 };新项目po,( ParallelOptions )ParallelOptions{ // MaxDegreeOfParallelism });

在任务不均衡的情况下,会很快失去原来的顺序,因为一些线程在重任务上可能会落后,但会按顺序分配任务。

  • 严格排序:按4个一组的顺序进行处理,如下所示:

0 1 2 3 4任务_____________________________屏障4 5 6 7 4任务_____________________________屏障等。

在这种情况下,您可以使用一个屏障:

Parallel.ForEach(list.AsParallel().AsOrdered(),b=新屏障(4);代码po =新ParallelOptions() { MaxDegreeOfParallelism =4 };ParallelOptions po,(item) => { // ParallelOptions b.SignalAndWait();});

尽管您必须确保任务的数量是4的倍数,否则屏障不会在单个任务的最后iterations.

  • Process 4项时发出信号:您可以创建一个封装原始列表的4项的任务对象,然后像第一种情况那样执行一个简单的Parallel.ForEach (即每个线程将作为单个任务的一部分顺序处理4项)。这将按顺序发出4个一组的任务,但如果任务花费的时间太长,可能会再次导致一些线程落后。
票数 5
EN

Stack Overflow用户

发布于 2012-09-03 17:59:59

我不清楚你到底在做什么,“元素是随机选择的”。但是如果您使用Paralle.ForEach(),那么它会试图提高效率,因此它会以某种方式对输入序列进行分区。如果输入序列是IList<T>,它将使用范围分区,否则,它将使用块分区(请参阅)。

如果希望按顺序处理这些项,可以使用custom partitioner配置Parallel.ForEach(),这会将集合划分为大小为1的块。

但由于您在这里并不真正需要Parallel.ForEach(),因此可能更简单的解决方案是创建4个任务,逐个处理这些项。对于同步,您可以使用BlockingCollection。类似于:

代码语言:javascript
运行
复制
public static class ParallelOrdered
{
    public static void ForEach<T>(IEnumerable<T> collection, Action<T> action, int degreeOfParallelism)
    {
        var blockingCollection = new BlockingCollection<T>();
        foreach (var item in collection)
            blockingCollection.Add(item);
        blockingCollection.CompleteAdding();

        var tasks = new Task[degreeOfParallelism];
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            tasks[i] = Task.Factory.StartNew(
                () =>
                {
                    foreach (var item in blockingCollection.GetConsumingEnumerable())
                        action(item);
                });
        }
        Task.WaitAll(tasks);
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/12241983

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档