首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >BlockingCollection,其中消费者也是生产者

BlockingCollection,其中消费者也是生产者
EN

Stack Overflow用户
提问于 2022-04-01 15:00:54
回答 1查看 456关注 0票数 1

我有一堆请求要处理,在处理这些请求时,可以生成更多的“子请求”,并将其添加到相同的阻塞集合中。使用者将子请求添加到队列中。

很难知道何时退出消费循环:显然没有线程可以调用BlockingCollection.CompleteAdding,因为其他线程可能会向集合添加一些内容。您也不能仅仅因为BlockingCollection是空的而退出消费循环,因为另一个线程可能刚刚读取了来自BlockingCollection的最后剩余请求,并且将开始生成更多的请求-- BlockingCollectionCount将再次从零增加。

到目前为止,我对此的唯一想法是使用Barrier --当所有线程都到达Barrier时,BlockingCollection中就没有任何东西了,并且没有线程能够生成新的请求。这是我的代码--这是一个可以接受的方法吗?(请注意:这是高度人为的代码块建模,一种更为复杂的情况:没有程序员真正编写处理随机字符串的代码)

代码语言:javascript
运行
复制
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;

namespace Barrier1
{
    class Program
    {
        private static readonly Random random = new Random();
        private static void Main()
        {
            var bc = new BlockingCollection<string>();
            AddRandomStringsToBc(bc, 1000, true);
            int nTasks = 4;
            var barrier = new Barrier(nTasks);
            Action a = () => DoSomething(bc, barrier);
            var actions = Enumerable.Range(0, nTasks).Select(x => a).ToArray();
            Parallel.Invoke(actions);
        }

        private static IEnumerable<char> GetC(bool includeA)
        {
            var startChar = includeA ? 'A' : 'B';
            var add = includeA ? 24 : 25;
            while (true)
            {
                yield return (char)(startChar + random.Next(add));
            }
        }

        private static void DoSomething(BlockingCollection<string> bc, Barrier barrier)
        {
            while (true)
            {
                if (bc.TryTake(out var str))
                {
                    Console.WriteLine(str);
                    if (str[0] == 'A')
                    {
                        Console.WriteLine("Adding more strings...");
                        AddRandomStringsToBc(bc, 100);
                    }
                }
                else
                {
                    // Can't exit the loop here just because there is nothing in the collection.
                    // A different thread may be just about to call AddRandomStringsToBc:
                    if (barrier.SignalAndWait(100))
                    {
                        break;
                    }
                }
            }
        }

        private static void AddRandomStringsToBc(BlockingCollection<string> bc, int n, bool startWithA = false, bool sleep = false)
        {
            var collection = Enumerable.Range(0, n).Select(x => string.Join("", GetC(startWithA).Take(5)));
            foreach (var c in collection)
            {
                bc.Add(c);
            }
        }
    }
}
EN

回答 1

Stack Overflow用户

发布于 2022-04-02 06:53:42

下面是一个类似于BlockingCollection的集合,它的不同之处在于它自动完成,而不是依赖于手动调用CompleteAdding方法。自动完成的条件是集合为空,并且所有使用者都处于等待状态。

该实现基于您聪明的想法,即使用Barrier作为检查自动完成条件的机制。它并不完美,因为它依赖于池,而池是在集合变为空并且有一些仍处于活动状态的使用者时发生的。另一方面,它允许利用BlockingCollection<T>类的所有现有功能,而不是从头重写它:

代码语言:javascript
运行
复制
/// <summary>
/// A blocking collection that completes automatically when it's empty, and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T> : IEnumerable<T>, IDisposable
{
    private readonly BlockingCollection<T> _queue;
    private readonly Barrier _barrier;
    private volatile bool _autoCompleteStarted;
    private volatile int _intervalMilliseconds = 500;

    public AutoCompleteBlockingCollection(int boundedCapacity = -1)
    {
        _queue = boundedCapacity == -1 ? new() : new(boundedCapacity);
        _barrier = new(0, _ => _queue.CompleteAdding());
    }

    public int Count => _queue.Count;
    public int BoundedCapacity => _queue.BoundedCapacity;
    public bool IsAddingCompleted => _queue.IsAddingCompleted;
    public bool IsCompleted => _queue.IsCompleted;

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete() => _autoCompleteStarted = true;

    /// <summary>
    /// Gets or sets how frequently to check for the auto-complete condition.
    /// </summary>
    public TimeSpan CheckAutoCompleteInterval
    {
        get { return TimeSpan.FromMilliseconds(_intervalMilliseconds); }
        set
        {
            int milliseconds = checked((int)value.TotalMilliseconds);
            if (milliseconds < 0) throw new ArgumentOutOfRangeException();
            _intervalMilliseconds = milliseconds;
        }
    }

    public void Add(T item, CancellationToken cancellationToken = default)
        => _queue.Add(item, cancellationToken);

    public bool TryAdd(T item) => _queue.TryAdd(item);

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        _barrier.AddParticipant();
        try
        {
            while (true)
            {
                if (!_autoCompleteStarted)
                {
                    if (_queue.TryTake(out var item, _intervalMilliseconds,
                        cancellationToken))
                        yield return item;
                }
                else
                {
                    if (_queue.TryTake(out var item, 0, cancellationToken))
                        yield return item;
                    else if (_barrier.SignalAndWait(_intervalMilliseconds,
                        cancellationToken))
                        break;
                }
            }
        }
        finally { _barrier.RemoveParticipant(); }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    public void Dispose() { _barrier.Dispose(); _queue.Dispose(); }
}

应该在添加集合中的初始项之后调用BeginObservingAutoComplete方法。在调用此方法之前,不检查自动完成条件.

默认情况下,CheckAutoCompleteInterval是500毫秒,并且可以在任何时候进行配置。

TakeTryTake方法是故意丢失的。集合打算通过GetConsumingEnumerable方法来使用。这样,集合将跟踪当前订阅的使用者,以便知道何时自动完成。消费者可以随时添加和删除。可以通过退出foreach循环( break/return等)或异常来删除使用者。

用法示例:

代码语言:javascript
运行
复制
private static void Main()
{
    var bc = new AutoCompleteBlockingCollection<string>();
    AddRandomStringsToBc(bc, 1000, true);
    bc.BeginObservingAutoComplete();
    Action action = () => DoSomething(bc);
    var actions = Enumerable.Repeat(action, 4).ToArray();
    Parallel.Invoke(actions);
}

private static void DoSomething(AutoCompleteBlockingCollection<string> bc)
{
    foreach (var str in bc.GetConsumingEnumerable())
    {
        Console.WriteLine(str);
        if (str[0] == 'A')
        {
            Console.WriteLine("Adding more strings...");
            AddRandomStringsToBc(bc, 100);
        }
    }
}

该集合是线程安全的,但Dispose方法除外。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71708984

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档