我试图在我的Parallel.ForEach
上运行Priority Queue
,但是我得到了以下错误:
严重性代码描述项目文件行抑制状态错误CS0411方法'Parallel.ForEach(OrderablePartitioner,ParallelOptions,Action)‘的类型参数不能从使用中推断。尝试显式指定类型参数。TPL_POC.PL
我知道如何用IEnumerable
和List
来执行List
,但是下面这些都没有什么进展。
private void ProcessTasksParallely()
{
PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
foreach (var task in this.tasks)
{
activeTasksPriority.Enqueue(task.Task, task.Id);
}
Console.WriteLine("Processing");
var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };
Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
options,
(t, priority) =>
{
Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
}
我正在尝试这样做,因为我需要并行地处理任务,但要根据它们的优先级来安排。
发布于 2022-03-15 09:14:50
PriorityQueue
类没有提供一种将其作为开箱即用的IEnumerable
来使用的方法。它只有一个UnorderedItems
属性,这不是您想要的。此属性不按特定顺序生成队列的内容,而不消耗它们。不过,为PriorityQueue<TElement, TPriority>
类实现自定义的PriorityQueue<TElement, TPriority>
方法很容易,如下所示:
/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
GetConsumingEnumerable<TElement, TPriority>(
this PriorityQueue<TElement, TPriority> source)
{
while (source.TryDequeue(out TElement element, out TPriority priority))
{
yield return (element, priority);
}
}
用法示例:
var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, options, entry =>
{
var (t, priority) = entry;
Console.WriteLine($"Priority: {priority}, Task: {t}");
Thread.Sleep(100);
});
Partitioner.Create
+NoBuffering
的目的是防止Parallel.ForEach
预先使用元素并将其存储到缓冲区中,然后再对其进行处理。
发布于 2022-03-15 10:05:45
如果您想在发布/子场景中实现优先级,那么Parallel.ForEach
和PriorityQueue<T>
都是错误的选择。
Parallel.ForEach
是为数据并行构建的--通过对内存中的大量数据进行分区处理,并且每个内核使用大约一个工作任务来处理每个分区,并且同步性最小。这里不需要PriorityQueue --如果您想要一个特定的订单,可以使用例如PLINQ和OrderBy
强制执行。Parallel.ForEach
缓冲区项使用的默认分区程序。这意味着一个新的高优先级项可能需要等待多个低优先级项。您必须使用Partitioner.Create
和禁用缓冲的选项在高吞吐量网络和消息传递中,优先级处理是通过多个队列执行的,而不是单个优先级队列。高优先级队列获得更多资源或在低优先级队列之前被处理。
每个优先级类一个队列
这就是高度可扩展的消息传递系统的工作方式,因为它不需要任何同步来确定下一步要处理的项。
实现此策略的一种方法是使用多个ActionBlock实例,每个实例具有不同数量的工作任务:
async Task ProcessMessage(string msg) {...}
ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{
MaxDegreeOfParallelism = dop
};
void BuildQueues()
{
_highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));
_midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));
_lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}
public void Process(string msg,int priority)
{
var queue= priority switch {
0 => _highQueue,
1 => _midQueue,
_ => _lowQueue
}
queue.Post(msg);
}
async Task Complete()
{
_highQueue.Complete();
_midQueue.Complete();
_lowQueue.Complete();
await Task.WhenAll(
_hiqhQueue.Completion,
_midQueue.Completion,
_lowQueue.Completion
);
}
在本例中,Process
使用模式匹配将消息路由到适当的ActionBlock。
https://stackoverflow.com/questions/71479188
复制相似问题