如何迭代“取(N)”--得到一个序列,每个内部序列都有下一个N元素?
我正在用Kotlin写一个高负荷的应用程序。
我有成千上万的条目要插入到数据库中。
我想把它们分批,比如说1000。
所以我创建了一个循环:
val itemsSeq = itemsList.iterator().asSequence()
while (true) {
log.debug("Taking $BATCH_SIZE from $itemsSeq")
val batchSeq = itemsSeq.take(BATCH_SIZE)
val squareBatch = applySomething(batchSeq, something)
?: break
}
fun applySomething(batch: Sequence<Item>, something: Something) {
/* Fully consumes batch. Bulk-loads from DB by IDs, applies, bulk-saves. */
}我认为take()将推进itemsSeq,下一次对take()的调用将给出从第10项开始的itemsSeq序列“视图”。
但是有了这个代码,我得到了:
DEBUG Taking 10 from kotlin.sequences.ConstrainedOnceSequence@53fe15ff
Exception in thread "main" java.lang.IllegalStateException: This sequence can be consumed only once.
at kotlin.sequences.ConstrainedOnceSequence.iterator(SequencesJVM.kt:23)
at kotlin.sequences.TakeSequence$iterator$1.<init>(Sequences.kt:411)
at kotlin.sequences.TakeSequence.iterator(Sequences.kt:409)因此,take()似乎再次“打开”了itemsSeq,而它只能使用一次。
作为解决办法,我可以使用chunked()
public fun <T> Sequence<T>.chunked(size: Int): Sequence<List<T>> {但是我更喜欢创建List,而不是Sequence。
我要找的是take() 和 chunked()**.**之间的东西
Kotlin SDK里有这样的东西吗?
我可以创建自己的sequence { ... },但为了可读性,我更喜欢内置的东西。
发布于 2022-07-12 10:26:04
有一种方法可以通过将序列交给Iterator来构造序列,请参阅序列。
给定迭代器函数,构造一个序列,通过该函数提供的iterator返回值。这些值是延迟计算的,并且序列可能是无限的。
它封装在一个扩展函数中,如下所示:
fun <T> Iterable<T>.toValuesThroughIteratorSequence(): Sequence<T> {
val iterator = this.iterator()
return Sequence { iterator }
}快速测试:
data class Test(val id: Int)
val itemsList = List(45) { Test(it) }
val batchSize = 10
val repetitions = itemsList.size.div(batchSize) + 1
val itemsSeq = itemsList.toValuesThroughIteratorSequence()
(0 until repetitions).forEach { index ->
val batchSeq = itemsSeq.take(batchSize)
println("Batch no. $index: " + batchSeq.map { it.id.toString().padStart(2, ' ') }.joinToString(" "))
}输出:
Batch no. 0: 0 1 2 3 4 5 6 7 8 9
Batch no. 1: 10 11 12 13 14 15 16 17 18 19
Batch no. 2: 20 21 22 23 24 25 26 27 28 29
Batch no. 3: 30 31 32 33 34 35 36 37 38 39
Batch no. 4: 40 41 42 43 44发布于 2022-07-12 16:24:37
背景
首先,我们需要意识到可以迭代的对象和表示“活动”或已经在运行的迭代过程的对象之间有很大的区别。第一组指Iterable (so List、Set和所有其他集合)、Array、Flow等。第二组主要是Iterator或旧Java Enumeration。在读取文件指针或数据库表与数据库游标时,这种差异也可以与文件指针进行比较。
Sequence 属于第一组。Sequence对象并不表示一个活动的、已经开始的迭代,而是一组元素。这些元素可以懒洋洋地产生,序列可以具有无界大小,并且通常在内部使用迭代器工作,但在概念上序列本身并不是迭代器。
如果我们观察一下关于序列的文档,它就会清楚地将它们与Iterable而不是Iterator进行比较。所有构建序列的标准方法,比如:sequenceOf()、sequence {}、Iterable.asSequence(),都会产生每次迭代它们时都会返回相同的项目列表的序列。Iterator.asSequence()也遵循这种模式,但是由于它不能两次重新生成相同的项,所以它被有意保护以避免多次迭代:
public fun <T> Iterator<T>.asSequence(): Sequence<T> = Sequence { this }.constrainOnce()问题
您最初使用take()的尝试没有奏效,因为这是对序列的误用。我们期望对同一个序列对象的后续take()调用将产生完全相同的项(通常),而不是下一个项。类似地,正如我们预期的那样,列表上的多个take()调用总是产生相同的项,每次从一开始就开始。
更具体的说,您的错误是由上面的constrainOnce()引起的。当我们在一个序列上多次调用take()时,它必须从一开始就重新启动,但是如果它是从迭代器创建的,那么它就不能这样做,所以Iterator.asSequence()明确不允许这样做。
简单解
要解决这个问题,可以跳过@lukas.j建议的constrainOnce()部件。这个解决方案很好,因为stdlib已经提供了像Sequence.take()这样的工具,所以如果小心使用,这是最容易实现的,而且它只是起作用。
然而,我个人认为这是一种解决办法,因为结果序列的行为与序列不同。它更像是类固醇上的迭代器,而不是真正的序列。在与现有运算符或第三方代码一起使用此序列时,您需要小心,因为这种序列的工作方式可能与他们预期的不同,因此可能会得到不正确的结果。
高级解决方案
我们可以跟踪您最初使用后续take()调用的尝试。在这种情况下,我们的对象用于活动迭代,因此它不再是一个适当的序列,而是一个迭代器。我们在stdlib中错过的唯一方法是使用单个块创建子迭代器。我们可以自己执行:
fun main() {
val list = (0 .. 25).toList()
val iter = list.iterator()
while (iter.hasNext()) {
val chunk = iter.limited(10)
println(chunk.asSequence().toList())
}
}
fun <T> Iterator<T>.limited(n: Int): Iterator<T> = object : Iterator<T> {
var left = n
val iterator = this@limited
override fun next(): T {
if (left == 0)
throw NoSuchElementException()
left--
return iterator.next()
}
override fun hasNext(): Boolean {
return left > 0 && iterator.hasNext()
}
}我把它命名为limited(),因为take()建议我们从迭代器中读取条目。相反,我们只在提供的迭代器之上创建另一个迭代器。
当然,序列比迭代器更容易使用,解决这个问题的典型方法是使用chunked()。使用上面的limited(),实现chunkedAsSequences()非常简单
fun main() {
val list = (0 .. 25).toList()
list.asSequence()
.chunkedAsSequences(10)
.forEach { println(it.toList()) }
}
fun <T> Sequence<T>.chunkedAsSequences(size: Int): Sequence<Sequence<T>> = sequence {
val iter = iterator()
while (iter.hasNext()) {
val chunk = iter.limited(size)
yield(chunk.asSequence())
chunk.forEach {} // required if chunk was not fully consumed
}
}也请注意,有一个棘手的情况,块没有完全消耗。chunkedAsSequences()不受此情况的影响。以前的更简单的解决方案则不然。
https://stackoverflow.com/questions/72950095
复制相似问题