试图通过直接在Flux上进行分组来理解分组的好处,而不是收集所有元素,然后使用Java流函数groupingBy (或Kotlin partition
,如下面的示例所示)对它们进行分组。
下面是两个例子:第一个是groupBy
,第二个是partition
。我想弄清楚什么时候该用什么。
@Test
fun groupByTwoLists() {
val numbers = (0..9).toList()
Flux.fromIterable(numbers)
.groupBy<Boolean> { it % 2 == 0 }
.flatMap { group: GroupedFlux<Boolean, Int> ->
if (group.key() == true) {
group.collectList().doOnNext { println("We are even") }
} else {
group.collectList().doOnNext { println("We are odd") }
}
}
.test()
.expectNextCount(2)
.verifyComplete()
}
@Test
fun partitionLists() {
val numbers = (0..9).toList()
val (even, odd) = numbers.partition { it % 2 == 0 }
val monoEven = Mono.just(even).doOnNext { println("We are even") }
val monoOdd = Mono.just(odd).doOnNext { println("We are odd") }
Flux.merge(monoEven, monoOdd)
.test()
.expectNextCount(2)
.verifyComplete()
}
发布于 2022-06-07 10:55:06
我现在明白了。
当您有一个想要对每个组进行不同处理的流时,流量(由Flux.groupBy
创建)是很好的。它允许您做诸如publishOn
a Scheduler
之类的事情。您可能希望在分组的每个类型上使用不同的Scheduler
。
我在最初的问题中给出的两个测试不是一个很好的例子,因为我使用的是一个有限的数字列表,而不是一个数据流。
下面是使用Flux.groupBy
https://levelup.gitconnected.com/creating-a-flux-of-fluxes-with-project-reactors-group-by-method-37200bfc2a的一个很好的例子。
https://stackoverflow.com/questions/72472839
复制相似问题