首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >通过BufferBlock的背压不起作用。(C# TPL数据流)

通过BufferBlock的背压不起作用。(C# TPL数据流)
EN

Stack Overflow用户
提问于 2020-05-22 09:06:05
回答 2查看 1K关注 0票数 0

典型情况:快制片,慢消费者,需要降低的制作速度。

不像我预期的那样工作的示例代码(下面解释):

代码语言:javascript
运行
复制
//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 2,
});

bb.LinkTo(ab);

ab.Completion.Wait();

我认为这段代码是如何工作的,但它不是:

  • BufferBlock bb是容量为3的阻塞队列。一旦达到容量,生产者就不能对其进行.Post(),直到出现空位为止。
    • 不是那样工作的。messages.

bb似乎很高兴地接受了任意数量的

  • producer是一个快速发布消息的任务。一旦所有消息都已发布,对bb.Complete()的调用应在处理完所有消息后通过管道传播和信号关闭。因此,在最后等待ab.Completion.Wait();
    • 也不起作用。一旦调用了messages.

,动作块ab就不会收到更多的.Complete()

可以用BlockingCollection来完成,我认为在TPL (TDF)世界中,BufferBlock相当于。我想我误解了在TPL数据流中背压是如何工作的。

,那么陷阱在哪里?如何运行此管道,在缓冲区bb**,中不允许超过3条消息并等待其完成?**

PS:我发现了这个gist (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),它建议保持一个信号量来阻止对BufferBlock的写入。我以为这是“内置的”。

接受答案后的更新:

接受答案后更新:

如果您正在研究这个问题,您需要记住,ActionBlock也有自己的输入缓冲区。

那是一个人的。然后,您还需要认识到,因为所有块都有自己的输入缓冲区,所以您可能不需要BufferBlock,因为您可能认为它的名称意味着什么。BufferBlock更像是用于更复杂体系结构的实用程序块,或者类似于平衡加载块。但这不是背压缓冲器。

完成传播需要在链接级别显式地定义。

在调用时,.LinkTo()需要显式地传递new DataflowLinkOptions {PropagateCompletion = true}作为第二个参数。

EN

Stack Overflow用户

回答已采纳

发布于 2020-05-26 00:05:19

在JSteward答案的指导下,我得出了以下代码。它产生(阅读等)同时处理所述项目的新项目,维护预读缓冲区.当“生产者”没有更多的物品时,完成信号被发送到链子的头。程序还在等待整个链的完成,然后终止。

代码语言:javascript
运行
复制
static async Task Main() {

    string Time() => $"{DateTime.Now:hh:mm:ss.fff}";

    // the buffer is added to the chain just for demonstration purposes
    // the chain would work fine using just the built-in input buffer
    // of the `action` block.
    var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});

    var action = new ActionBlock<int>(async i =>
    {
        Console.WriteLine($"[{Time()}]: Processing: {i}");
        await Task.Delay(500);
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});

    // it's necessary to set `PropagateCompletion` property
    buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});

    //Producer
    foreach (var i in Enumerable.Range(0, 10))
    {
        Console.WriteLine($"[{Time()}]: Ready to send: {i}");
        await buffer.SendAsync(i);
        Console.WriteLine($"[{Time()}]: Sent: {i}");
    }

    // we call `.Complete()` on the head of the chain and it's propagated forward
    buffer.Complete(); 

    await action.Completion;
}
票数 2
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61951466

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档