首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用多个元素阻止集合?

用多个元素阻止集合?
EN

Stack Overflow用户
提问于 2016-09-16 21:28:11
回答 3查看 3.2K关注 0票数 3

生产者-消费者集合1实现的所有C#似乎都有类似于以下接口的接口:

代码语言:javascript
运行
复制
private Queue<T> items;

public void Produce(T item)
public T Consume()

有像下面这样的实现吗?

代码语言:javascript
运行
复制
private Queue<T> items;

public void Produce(T[] item)
public T[] Consume(int count)

希望这样可以让我一次生产/消费不同数量的商品,而不需要对每个项目进行过多的锁定。这似乎是生产/消费大量商品所必需的,但我还没有找到任何实现。

1

2

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2016-09-16 23:05:46

有多种可能的方法,取决于您到底想要实现什么。

IProducerConsumerCollection接口的实现。据我所知,.NET框架中这个接口的唯一线程安全实现是BlockingCollection

这个类允许有阻塞或非阻塞的生产者和消费者.生产者端设置在阻塞和非阻塞之间,为构造函数中的集合提供容量限制。正如BlockingCollection.Add(T)方法的文档所述:

如果在初始化此BlockingCollection实例时指定了有界容量,则要添加的调用可能会阻塞,直到可用空间存储所提供的项为止。

对于获取项,您可以使用不同的TakeTryTake方法,或者非常方便的BlockingCollection.GetConsumingEnumerable()方法,该方法创建一个IEnumerable<T>,该方法在获取下一个值时从BlockingCollection<T>中消耗一个元素,并在源集合为空的情况下阻塞。也就是说,直到调用了BlockingCollection.CompleteAdding(),并且集合不接受任何新的数据。此时,所有使用可枚举实例的实例都将停止阻塞,并报告不再存在数据(一旦使用完所有剩余数据)。

因此,您基本上可以实现这样的消费者:

代码语言:javascript
运行
复制
BlockingCollection<...> bc = ...
foreach (var item in bc.GetConsumingEnumerable())
{
    // do something with your item
}

这样的使用者可以在多个线程中启动,因此如果选择从源读取多个线程。您可以创建任意数量的消费枚举。

您应该知道,这个集合实际上只是一个包装器。有一个构造函数允许您设置使用的集合类型。默认情况下,ConcurrentQueue。这意味着,默认情况下,集合的行为与此队列类似,并且是先入先出的集合,以防您只使用一个生产者和一个使用者。

尽管如此,还有另一种选择。如果您不需要阻塞部分(或者您想自己实现阻塞部分),并且如果您不需要集合中任何元素的顺序,那么就有ConcurrentBag。此集合非常有效地处理来自多个线程的访问。它在ThreadLocal包装器中使用较小的集合。因此,每个线程都使用自己的存储,并且只有当线程在自己的存储中用完项时,它才开始从另一个线程存储中获取项。

如果在用例中连续发生生产和消费,使用此集合可能会很有趣。因此,首先添加所有项,一旦完成,您将使用所有项,这两个项都具有多个线程。

票数 6
EN

Stack Overflow用户

发布于 2016-09-16 22:15:35

希望这样可以让我一次生产/消费不同数量的商品,而不需要对每个项目进行过多的锁定。

您可以使用BlockingCollection类;尽管它没有添加或接受多个项的方法,但它不使用内部锁。

票数 3
EN

Stack Overflow用户

发布于 2020-02-07 13:15:13

出于对此的需要,我自己创建了一个方法扩展。请注意,如果至少从队列中删除了一个元素,则此调用将记录任何进一步的异常,并返回这一个元素以防止任何丢失。

代码语言:javascript
运行
复制
public static class BlockingCollectionMethodExtensions
{
    public static List<T> FetchAtLeastOneBlocking<T>(this BlockingCollection<T> threadSafeQueue, int maxCount, ICommonLog log)
    {
        var resultList = new List<T>();

        // Take() will block the thread until new elements appear
        // It will also throw an InvalidOperationException when blockingCollection is Completed
        resultList.Add(threadSafeQueue.Take());

        try
        {
            // Fetch more unblocking
            while (threadSafeQueue.Count > 0 && resultList.Count < maxCount)
            {
                T item;
                bool success = false;
                success = threadSafeQueue.TryTake(out item);
                if (success)
                {
                    resultList.Add(item);
                }
                else
                {

                }
            }
        }
        catch (Exception ex)
        {
            log.Fatal($"Unknown error fetching more elements. Continuing to process the {resultList.Count} already fetched items.", ex);
        }

        return resultList;
    }
}

以及相应的测试:

代码语言:javascript
运行
复制
public class BlockingCollectionMethodExtensionsTest : UnitTestBase
{
    [Fact]
    public void FetchAtLeastOneBlocking_FirstEmpty_ThenSingleEntryAdded_ExpectBlocking_Test()
    {
        var queue = new BlockingCollection<int>();

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for 2 seconds to ensure that nothing will be fetched
        Thread.Sleep(TimeSpan.FromSeconds(1));
        Assert.Null(fetchResult);

        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);

        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Single(fetchResult);
        Assert.Equal(78, fetchResult.Single());
    }

    [Fact]
    public void FetchAtLeastOneBlocking_FirstEmpty_ThenCompleted_ExpectOperationException_Test()
    {
        var queue = new BlockingCollection<int>();
        Exception catchedException = null;

        var startEvent = new ManualResetEvent(initialState: false);
        var exceptionEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            try
            {
                fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            }
            catch (Exception ex)
            {
                catchedException = ex;
                exceptionEvent.Set();

            }
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for 2 seconds to ensure that nothing will be fetched
        Thread.Sleep(TimeSpan.FromSeconds(1));
        Assert.Null(fetchResult);

        // Now complete the queue and assert that fetching threw the expected exception
        queue.CompleteAdding();

        // Wait for the exception to be thrown
        var exceptionSuccess = exceptionEvent.WaitOne(TimeSpan.FromSeconds(2));
        Assert.True(exceptionSuccess);
        Assert.NotNull(catchedException);
        Assert.IsType<InvalidOperationException>(catchedException);
    }

    [Fact]
    public void FetchAtLeastOneBlocking_SingleEntryExists_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Single(fetchResult);
        Assert.Equal(78, fetchResult.Single());
    }

    [Fact]
    public void FetchAtLeastOneBlocking_MultipleEntriesExist_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);
        queue.Add(79);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Equal(2, fetchResult.Count);
        Assert.Equal(78, fetchResult[0]);
        Assert.Equal(79, fetchResult[1]);
    }

    [Fact]
    public void FetchAtLeastOneBlocking_MultipleEntriesExist_MaxCountExceeded_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);
        queue.Add(79);
        queue.Add(80);
        queue.Add(81);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Equal(3, fetchResult.Count);
        Assert.Equal(78, fetchResult[0]);
        Assert.Equal(79, fetchResult[1]);
        Assert.Equal(80, fetchResult[2]);
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39540330

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档