我有一个来自数据库的数据流,我希望使用Java来迭代和选择其中的X
数量并将其输出到另一个流。Order 不重要。
与…有关的东西
source
.collect(x -> collectNItems(10)) // basically from the stream choose 10 items
.flatmap(collectedItems -> Stream.of(collectedItems))
.map(x -> buildElasticSearchBatchInsertRequest(x))
.forEach(request -> insertToElasticSearch(request));
说源是1,2,3,4,5,6,7,8,9,10,11,12,13,14
在flatMap之后,我应该得到一个
(12, 1, 14, 3)
(2, 4, 5 , 8, 7, 6, 10, 11, 9, 13)
(再说一遍,顺序并不重要)我只需要把它分组在一起。
这方面的主要用例是根据数据库数据对Elasticsearch执行批量插入,因为每次插入都比较慢,而批量插入整个过程将消耗大量内存。
发布于 2020-04-14 00:51:16
在https://code-examples.net/en/q/1d38ce7发现了一个
要使用
@Test
public void try2() {
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
BatchSpliterator.decorate(stream, 10)
.forEach(System.out::println);
}
和BatchSpliterator
装饰器,只是稍微调整了一下原来的链接,因为估计大小应该返回MAX_VALUE
(如果未知)。
public class BatchSpliterator<E> implements Spliterator<List<E>> {
public static <E> Stream<List<E>> decorate(Stream<E> originalStream, int batchSize) {
return StreamSupport.stream(new BatchSpliterator<>(originalStream.spliterator(), batchSize), originalStream.isParallel());
}
private final Spliterator<E> base;
private final int batchSize;
private BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
base.tryAdvance(batch::add);
}
if (batch.isEmpty()) {
return false;
}
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase==null ? null
:new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final long baseSize = base.estimateSize();
return baseSize==Long.MAX_VALUE ? baseSize
:(long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
@Override
public boolean hasCharacteristics(int characteristics) {
return base.hasCharacteristics(characteristics);
}
@Override
public Comparator<? super List<E>> getComparator() {
throw new UnsupportedOperationException("getComparator");
}
}
https://stackoverflow.com/questions/61191692
复制相似问题