首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >具有优先级队列的Parallel.ForEach在.NET6中

具有优先级队列的Parallel.ForEach在.NET6中
EN

Stack Overflow用户
提问于 2022-03-15 08:37:54
回答 2查看 444关注 0票数 1

我试图在我的Parallel.ForEach上运行Priority Queue,但是我得到了以下错误:

严重性代码描述项目文件行抑制状态错误CS0411方法'Parallel.ForEach(OrderablePartitioner,ParallelOptions,Action)‘的类型参数不能从使用中推断。尝试显式指定类型参数。TPL_POC.PL

我知道如何用IEnumerableList来执行List,但是下面这些都没有什么进展。

代码语言:javascript
运行
复制
private void ProcessTasksParallely()
{
    PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
    foreach (var task in this.tasks)
    {
        activeTasksPriority.Enqueue(task.Task, task.Id);
    }
    Console.WriteLine("Processing");

    var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };

    Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
        options,
        (t, priority) =>
        {
            Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
}

我正在尝试这样做,因为我需要并行地处理任务,但要根据它们的优先级来安排。

EN

回答 2

Stack Overflow用户

发布于 2022-03-15 09:14:50

PriorityQueue类没有提供一种将其作为开箱即用的IEnumerable来使用的方法。它只有一个UnorderedItems属性,这不是您想要的。此属性不按特定顺序生成队列的内容,而不消耗它们。不过,为PriorityQueue<TElement, TPriority>类实现自定义的PriorityQueue<TElement, TPriority>方法很容易,如下所示:

代码语言:javascript
运行
复制
/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
    GetConsumingEnumerable<TElement, TPriority>(
    this PriorityQueue<TElement, TPriority> source)
{
    while (source.TryDequeue(out TElement element, out TPriority priority))
    {
        yield return (element, priority);
    }
}

用法示例:

代码语言:javascript
运行
复制
var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(partitioner, options, entry =>
{
    var (t, priority) = entry;
    Console.WriteLine($"Priority: {priority}, Task: {t}");
    Thread.Sleep(100);
});

Partitioner.Create+NoBuffering的目的是防止Parallel.ForEach预先使用元素并将其存储到缓冲区中,然后再对其进行处理。

票数 2
EN

Stack Overflow用户

发布于 2022-03-15 10:05:45

如果您想在发布/子场景中实现优先级,那么Parallel.ForEachPriorityQueue<T>都是错误的选择。

  • Parallel.ForEach是为数据并行构建的--通过对内存中的大量数据进行分区处理,并且每个内核使用大约一个工作任务来处理每个分区,并且同步性最小。这里不需要PriorityQueue --如果您想要一个特定的订单,可以使用例如PLINQ和OrderBy强制执行。
  • 优先级不可避免地改变了感知到的项目顺序和队列状态,这对于并发性来说是一个很大的不-否。
  • 轻重缓急会被颠倒。所有工作任务都可能忙于处理低优先级项,而新的高优先级项正在等待。更糟糕的是,Parallel.ForEach缓冲区项使用的默认分区程序。这意味着一个新的高优先级项可能需要等待多个低优先级项。您必须使用Partitioner.Create禁用缓冲的选项

在高吞吐量网络和消息传递中,优先级处理是通过多个队列执行的,而不是单个优先级队列。高优先级队列获得更多资源或在低优先级队列之前被处理。

每个优先级类一个队列

这就是高度可扩展的消息传递系统的工作方式,因为它不需要任何同步来确定下一步要处理的项。

实现此策略的一种方法是使用多个ActionBlock实例,每个实例具有不同数量的工作任务:

代码语言:javascript
运行
复制
async Task ProcessMessage(string msg) {...}

ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism = dop
};


void BuildQueues()
{ 

   _highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));

   _midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));

   _lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}

public void Process(string msg,int priority)
{
    var queue= priority switch {
          0 => _highQueue,
          1 => _midQueue,
          _ => _lowQueue
    }
    queue.Post(msg);    
}

async Task Complete()
{
    _highQueue.Complete();
    _midQueue.Complete();
    _lowQueue.Complete();
    await Task.WhenAll(
        _hiqhQueue.Completion, 
        _midQueue.Completion, 
        _lowQueue.Completion
    );
}

在本例中,Process使用模式匹配将消息路由到适当的ActionBlock。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71479188

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档