我有一堆请求要处理,在处理这些请求时,可以生成更多的“子请求”,并将其添加到相同的阻塞集合中。使用者将子请求添加到队列中。
很难知道何时退出消费循环:显然没有线程可以调用BlockingCollection.CompleteAdding
,因为其他线程可能会向集合添加一些内容。您也不能仅仅因为BlockingCollection
是空的而退出消费循环,因为另一个线程可能刚刚读取了来自BlockingCollection
的最后剩余请求,并且将开始生成更多的请求-- BlockingCollection
的Count
将再次从零增加。
到目前为止,我对此的唯一想法是使用Barrier
--当所有线程都到达Barrier
时,BlockingCollection
中就没有任何东西了,并且没有线程能够生成新的请求。这是我的代码--这是一个可以接受的方法吗?(请注意:这是高度人为的代码块建模,一种更为复杂的情况:没有程序员真正编写处理随机字符串的代码)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
namespace Barrier1
{
class Program
{
private static readonly Random random = new Random();
private static void Main()
{
var bc = new BlockingCollection<string>();
AddRandomStringsToBc(bc, 1000, true);
int nTasks = 4;
var barrier = new Barrier(nTasks);
Action a = () => DoSomething(bc, barrier);
var actions = Enumerable.Range(0, nTasks).Select(x => a).ToArray();
Parallel.Invoke(actions);
}
private static IEnumerable<char> GetC(bool includeA)
{
var startChar = includeA ? 'A' : 'B';
var add = includeA ? 24 : 25;
while (true)
{
yield return (char)(startChar + random.Next(add));
}
}
private static void DoSomething(BlockingCollection<string> bc, Barrier barrier)
{
while (true)
{
if (bc.TryTake(out var str))
{
Console.WriteLine(str);
if (str[0] == 'A')
{
Console.WriteLine("Adding more strings...");
AddRandomStringsToBc(bc, 100);
}
}
else
{
// Can't exit the loop here just because there is nothing in the collection.
// A different thread may be just about to call AddRandomStringsToBc:
if (barrier.SignalAndWait(100))
{
break;
}
}
}
}
private static void AddRandomStringsToBc(BlockingCollection<string> bc, int n, bool startWithA = false, bool sleep = false)
{
var collection = Enumerable.Range(0, n).Select(x => string.Join("", GetC(startWithA).Take(5)));
foreach (var c in collection)
{
bc.Add(c);
}
}
}
}
发布于 2022-04-02 06:53:42
下面是一个类似于BlockingCollection
的集合,它的不同之处在于它自动完成,而不是依赖于手动调用CompleteAdding
方法。自动完成的条件是集合为空,并且所有使用者都处于等待状态。
该实现基于您聪明的想法,即使用Barrier
作为检查自动完成条件的机制。它并不完美,因为它依赖于池,而池是在集合变为空并且有一些仍处于活动状态的使用者时发生的。另一方面,它允许利用BlockingCollection<T>
类的所有现有功能,而不是从头重写它:
/// <summary>
/// A blocking collection that completes automatically when it's empty, and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T> : IEnumerable<T>, IDisposable
{
private readonly BlockingCollection<T> _queue;
private readonly Barrier _barrier;
private volatile bool _autoCompleteStarted;
private volatile int _intervalMilliseconds = 500;
public AutoCompleteBlockingCollection(int boundedCapacity = -1)
{
_queue = boundedCapacity == -1 ? new() : new(boundedCapacity);
_barrier = new(0, _ => _queue.CompleteAdding());
}
public int Count => _queue.Count;
public int BoundedCapacity => _queue.BoundedCapacity;
public bool IsAddingCompleted => _queue.IsAddingCompleted;
public bool IsCompleted => _queue.IsCompleted;
/// <summary>
/// Begin observing the condition for automatic completion.
/// </summary>
public void BeginObservingAutoComplete() => _autoCompleteStarted = true;
/// <summary>
/// Gets or sets how frequently to check for the auto-complete condition.
/// </summary>
public TimeSpan CheckAutoCompleteInterval
{
get { return TimeSpan.FromMilliseconds(_intervalMilliseconds); }
set
{
int milliseconds = checked((int)value.TotalMilliseconds);
if (milliseconds < 0) throw new ArgumentOutOfRangeException();
_intervalMilliseconds = milliseconds;
}
}
public void Add(T item, CancellationToken cancellationToken = default)
=> _queue.Add(item, cancellationToken);
public bool TryAdd(T item) => _queue.TryAdd(item);
public IEnumerable<T> GetConsumingEnumerable(
CancellationToken cancellationToken = default)
{
_barrier.AddParticipant();
try
{
while (true)
{
if (!_autoCompleteStarted)
{
if (_queue.TryTake(out var item, _intervalMilliseconds,
cancellationToken))
yield return item;
}
else
{
if (_queue.TryTake(out var item, 0, cancellationToken))
yield return item;
else if (_barrier.SignalAndWait(_intervalMilliseconds,
cancellationToken))
break;
}
}
}
finally { _barrier.RemoveParticipant(); }
}
IEnumerator<T> IEnumerable<T>.GetEnumerator()
=> ((IEnumerable<T>)_queue).GetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
=> ((IEnumerable<T>)_queue).GetEnumerator();
public void Dispose() { _barrier.Dispose(); _queue.Dispose(); }
}
应该在添加集合中的初始项之后调用BeginObservingAutoComplete
方法。在调用此方法之前,不检查自动完成条件.
默认情况下,CheckAutoCompleteInterval
是500毫秒,并且可以在任何时候进行配置。
Take
和TryTake
方法是故意丢失的。集合打算通过GetConsumingEnumerable
方法来使用。这样,集合将跟踪当前订阅的使用者,以便知道何时自动完成。消费者可以随时添加和删除。可以通过退出foreach
循环( break
/return
等)或异常来删除使用者。
用法示例:
private static void Main()
{
var bc = new AutoCompleteBlockingCollection<string>();
AddRandomStringsToBc(bc, 1000, true);
bc.BeginObservingAutoComplete();
Action action = () => DoSomething(bc);
var actions = Enumerable.Repeat(action, 4).ToArray();
Parallel.Invoke(actions);
}
private static void DoSomething(AutoCompleteBlockingCollection<string> bc)
{
foreach (var str in bc.GetConsumingEnumerable())
{
Console.WriteLine(str);
if (str[0] == 'A')
{
Console.WriteLine("Adding more strings...");
AddRandomStringsToBc(bc, 100);
}
}
}
该集合是线程安全的,但Dispose
方法除外。
https://stackoverflow.com/questions/71708984
复制相似问题