我似乎找不到这方面的任何文档,但我知道AkkaStreams存储用于在内存中调用groupBy时将流分组为子流的键。是否有可能从子流中提取这些密钥?假设我从我的主流创建了一组子流,通过一个折叠来计算每个子流中的对象,然后将计数存储在一个类中。我能不能让子流的键也传递给那个类呢?还是有更好的方法来做这件事?我需要计算每个子流中的每个元素,但也需要存储计数属于哪个组。
发布于 2020-11-16 21:06:37
溪流食谱中显示了一个很好的示例
val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first
.groupBy(MaximumDistinctWords, identity)
//transform each element to pair with number of words in it
.map(_ -> 1)
// add counting logic to the streams
.reduce((l, r) => (l._1, l._2 + r._2))
// get a stream of word counts
.mergeSubstreams
然后是:
val words = Source(List("Hello", "world", "let's", "say", "again", "Hello", "world"))
counts.runWith(Sink.foreach(println))
将印刷:
(world,2)
(Hello,2)
(let's,1)
(again,1)
(say,1)
我想到的另一个例子是,用剩下的数来计算数字。因此,如下所示:
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(e => e % 4 -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams.to(Sink.foreach(println)).run()
将印刷:
(0,26)
(1,26)
(2,25)
(3,25)
https://stackoverflow.com/questions/64864519
复制相似问题