我正在尝试实现一个具有以下签名的方法:
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator);方法的目标是将每个流类型平平为单个流,并将输出打包成一对。我只有一个Iterator (不是Iterable),我不能更改方法签名,所以我必须在一个迭代中执行扁平化。
我目前最好的实现是
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>> iterator) {
Stream<A> aStream = Stream.empty();
Stream<B> bStream = Stream.empty();
while(iterator.hasNext()) {
Pair<Stream<A>, Stream<B>> elm = iterator.next();
aStream = Stream.concat(aStream, elm.first);
bStream = Stream.concat(bStream, elm.second);
}
return Pair.of(aStream, bStream);
}但是,虽然这在技术上是正确的,但我对此并不十分满意,原因有两个:
感觉Stream#flatMap应该适合这里(在使用番石榴Streams.stream(石榴)将输入Iterator转换为流之后),但由于中间的对类型,它似乎无法工作。
另一个要求是,任何迭代器/流都可能非常大(例如,输入可以包含从一对非常大的流到许多项流),因此理想情况下,解决方案不应该将结果收集到内存集合中。
发布于 2017-06-24 12:06:16
避免收集整个Iterator (就像您在问题中实际做的那样)是相当困难的,因为您不知道如何使用产生的流:一个可以完全消耗,需要完全消耗迭代器,而另一个则根本不消耗,需要跟踪生成的所有对--在某个地方有效地收集它们。
只有当流以“速度”或多或少地被消耗时,才能从不收集整个迭代器中获益。但是这种消耗意味着要么使用结果流之一的迭代器,要么在并行线程中使用流-这将需要额外的同步。
因此,我建议将所有对收集成一个List,然后从该列表中生成新的Pair:
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
Iterable<Pair<Stream<A>, Stream<B>>> iterable = () -> iterator;
final List<Pair<Stream<A>, Stream<B>>> allPairs =
StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.toList());
return Pair.of(
allPairs.stream().flatMap(p -> p.first),
allPairs.stream().flatMap(p -> p.second)
);
}这还没有消耗任何原始流,同时保持一个避免嵌套流连接的简单解决方案。
发布于 2017-06-24 20:00:52
首先,这将是您的代码的“更多功能”版本,您会说您更喜欢在风格上:
<A, B> Pair<Stream<A>, Stream<B>> flattenFunctional(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
return Streams.stream(iterator)
.reduce(Pair.of(Stream.empty(), Stream.empty()),
(a, b) -> Pair.of(
Stream.concat(a.first, b.first),
Stream.concat(a.second, b.second)));
}在使用StackOverflowError时,有关可能的Stream.concat的警告仍然适用于这里。
为了避免这种情况,并考虑到大型数据集的性能和内存使用,我有以下建议(一点也不实用)。您可以创建一对自定义Iterator (用于A、B类型),并使用番石榴的Streams.stream()获取一对流。将这些自定义迭代器放入具有一对迭代器的类中。例如,如果在iterator的第一对中,Stream<A>的元素比Stream<B>少,那么在Stream<A>耗尽之后,调用iterator.next()并将B的迭代器推到它的堆栈中。下面是具有一对堆栈的类(添加一个构造函数):
class PairStreamIterator<A, B> {
private final Iterator<Pair<Stream<A>, Stream<B>>> iterator;
private final Queue<Iterator<A>> stackA = new ArrayDeque<>();
private final Queue<Iterator<B>> stackB = new ArrayDeque<>();
Iterator<A> getItA() {
return new Iterator<A>() {
@Override public boolean hasNext() {
if (!stackA.isEmpty() && !stackA.peek().hasNext()) {
stackA.remove();
return hasNext();
} else if (!stackA.isEmpty() && stackA.peek().hasNext()) {
return true;
} else if (iterator.hasNext()) {
Pair<Stream<A>, Stream<B>> pair = iterator.next();
stackA.add(pair.first.iterator());
stackB.add(pair.second.iterator());
return hasNext();
}
return false;
}
@Override public A next() {
return stackA.peek().next();
}
};
}
// repeat for Iterator<B>
}flatten方法:
<A, B> Pair<Stream<A>, Stream<B>> flattenIt(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
final PairStreamIterator<A, B> pair = new PairStreamIterator<>(iterator);
return Pair.of(Streams.stream(pair.getItA()), Streams.stream(pair.getItB()));
}如果您以相同的速率使用flatten结果对中的两个流,那么这两个堆栈通常会容纳1或2个迭代器。最糟糕的情况是,如果您计划完全使用结果对的一个流,然后再使用另一个流。在这种情况下,第二个扁平流所需的所有迭代器都将保留在迭代器堆栈中。我不认为有什么办法可以绕过我的恐惧。由于这些都存储在内存中的堆中,所以您将不会得到StackOverflowError,尽管您仍然可以得到OutOfMemoryError
一个可能的警告是在hasNext中使用递归。只有当您在输入中遇到许多连续的空流时,才会出现问题。
https://stackoverflow.com/questions/44735562
复制相似问题