首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >TPL数据流从所有传入节点(多个生产者,1个消费者)创建聚合结果数组

TPL数据流从所有传入节点(多个生产者,1个消费者)创建聚合结果数组
EN

Stack Overflow用户
提问于 2018-08-29 17:49:15
回答 2查看 1.2K关注 0票数 2

请注意以下代码示例。我需要一个聚合器节点,可以链接到任何数量的来源,等待所有来源发送一条消息,然后组合在一个result[]中。

这应该是显而易见和直截了当的,但不知何故,我找不到解决方案。我检查了JoinBlock和TransformaterBlock,但两者似乎都不合适。

代码语言:javascript
运行
复制
using System;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly uint _produceCount = 0;
        private static void Main(string[] args)
        {

            BufferBlock<string> p1 = new BufferBlock<string>();
            BufferBlock<string> p2 = new BufferBlock<string>();

            // a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs

            ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
            {
                Console.WriteLine(String.Join(',', inputs));
            });

            p1.Post("Produce 1.1");
            p2.Post("Produce 2.1");

            // desired output:
            // "Produce 1.1, Produce 2.1"
            // actually the order is of no importance at this time

        }


    }
}

编辑进一步的说明:我希望有一个块:-动态等待所有源注释(在第一条消息到达的时间点)完成并聚合结果以传递给追随者节点

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-08-29 23:36:13

为此,您可以使用非贪婪的BatchBlock。通过非贪婪,每个源将为批处理贡献一项。这是originally suggested here。下面是一个经过测试的示例:注意作为证据,source1被发送了多个没有出现在批次中的项目:

代码语言:javascript
运行
复制
public class DataAggregator
{
    private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
    private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
    private BufferBlock<string> source1 = new BufferBlock<string>();
    private BufferBlock<string> source2 = new BufferBlock<string>();
    private BufferBlock<string> source3 = new BufferBlock<string>();
    private BufferBlock<string> source4 = new BufferBlock<string>();
    private BufferBlock<string> source5 = new BufferBlock<string>();

    public DataAggregator()
    {
        source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    [Test]
    public async Task TestPipeline()
    {
        source1.Post("string1-1");
        source1.Post("string1-2");
        source1.Post("string1-3");
        source2.Post("string2-1");
        source3.Post("string3-1");
        source4.Post("string4-1");
        source5.Post("string5-1");
        //Should print string1-1 string2-1 string3-1 string4-1 string5-1
        source1.Complete();
        source2.Complete();
        source3.Complete();
        source4.Complete();
        source5.Complete();
        await writer.Completion;
    }
}

输出:

代码语言:javascript
运行
复制
string1-1
string2-1
string3-1
string4-1
string5-1
票数 1
EN

Stack Overflow用户

发布于 2018-08-29 18:48:39

如果你事先知道你的资源,我会把JoinBlockTransformBlock一起使用。您必须为每个源创建一个BufferBlock

首先,JoinBlock等待来自每个源的一条消息,并将它们打包到一个元组中。然后,TransformBlock从中间元组创建一个结果数组。

如果你事先不知道你的源码,你需要解释你如何期望你的新代码块知道什么时候产生结果。然后,应该将该逻辑放入自定义块中,可能是以TransformManyBlock<string,string[]>的形式。

如果你想加入一个动态数量的源,你可以像这样创建一个无限的加入块:

代码语言:javascript
运行
复制
private static void Main()
{
    var source1 = new BufferBlock<string>();
    var source2 = new BufferBlock<string>();
    var source3 = new BufferBlock<string>();
    var aggregator = CreateAggregatorBlock( 3 );
    var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
    source1.LinkTo( aggregator );
    source2.LinkTo( aggregator );
    source3.LinkTo( aggregator );
    aggregator.LinkTo( result );

    source1.Post( "message 1" );
    source2.Post( "message 2" );
    source3.Post( "message 3" );

    Console.ReadLine();
}

private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
    var buffer = new List<string>();
    return new TransformManyBlock<string, string[]>( message => {
        buffer.Add( message );
        if( buffer.Count == sources )
        {
            var result = buffer.ToArray();
            buffer.Clear();
            return new[] {result};
        }
        return Enumerable.Empty<string[]>();
    } );
}

这假设您的源以相同的速率生成消息。如果不是这样,您需要在消息旁边显示源的身份,并为每个源设置一个缓冲区。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52074457

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档