典型情况:快制片,慢消费者,需要降低的制作速度。
不像我预期的那样工作的示例代码(下面解释):
// I assumed this block will behave like BlockingCollection, but it doesn't
var bb = new BufferBlock<int>(new DataflowBlockOptions {
BoundedCapacity = 3, // looks like this does nothing
});
// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
while (dataSource < 10) {
var message = ++dataSource;
bb.Post(message);
Console.WriteLine($"Posted: {message}");
}
Console.WriteLine("Calling .Complete() on buffer block");
bb.Complete();
});
// slow consumer
var ab = new ActionBlock<int>(i => {
Thread.Sleep(500);
Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 2,
});
bb.LinkTo(ab);
ab.Completion.Wait();
我认为这段代码是如何工作的,但它不是:
BufferBlock
bb
是容量为3的阻塞队列。一旦达到容量,生产者就不能对其进行.Post()
,直到出现空位为止。 bb
似乎很高兴地接受了任意数量的
producer
是一个快速发布消息的任务。一旦所有消息都已发布,对bb.Complete()
的调用应在处理完所有消息后通过管道传播和信号关闭。因此,在最后等待ab.Completion.Wait();
。,动作块ab
就不会收到更多的.Complete()
可以用BlockingCollection
来完成,我认为在TPL (TDF)世界中,BufferBlock
相当于。我想我误解了在TPL数据流中背压是如何工作的。
,那么陷阱在哪里?如何运行此管道,在缓冲区bb
**,中不允许超过3条消息并等待其完成?**
PS:我发现了这个gist (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),它建议保持一个信号量来阻止对BufferBlock
的写入。我以为这是“内置的”。
接受答案后的更新:
接受答案后更新:
如果您正在研究这个问题,您需要记住,ActionBlock
也有自己的输入缓冲区。
那是一个人的。然后,您还需要认识到,因为所有块都有自己的输入缓冲区,所以您可能不需要BufferBlock
,因为您可能认为它的名称意味着什么。BufferBlock
更像是用于更复杂体系结构的实用程序块,或者类似于平衡加载块。但这不是背压缓冲器。
完成传播需要在链接级别显式地定义。
在调用时,.LinkTo()
需要显式地传递new DataflowLinkOptions {PropagateCompletion = true}
作为第二个参数。
发布于 2020-05-26 00:05:19
在JSteward答案的指导下,我得出了以下代码。它产生(阅读等)同时处理所述项目的新项目,维护预读缓冲区.当“生产者”没有更多的物品时,完成信号被发送到链子的头。程序还在等待整个链的完成,然后终止。
static async Task Main() {
string Time() => $"{DateTime.Now:hh:mm:ss.fff}";
// the buffer is added to the chain just for demonstration purposes
// the chain would work fine using just the built-in input buffer
// of the `action` block.
var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});
var action = new ActionBlock<int>(async i =>
{
Console.WriteLine($"[{Time()}]: Processing: {i}");
await Task.Delay(500);
}, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});
// it's necessary to set `PropagateCompletion` property
buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});
//Producer
foreach (var i in Enumerable.Range(0, 10))
{
Console.WriteLine($"[{Time()}]: Ready to send: {i}");
await buffer.SendAsync(i);
Console.WriteLine($"[{Time()}]: Sent: {i}");
}
// we call `.Complete()` on the head of the chain and it's propagated forward
buffer.Complete();
await action.Completion;
}
https://stackoverflow.com/questions/61951466
复制相似问题