首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kotlin:用N个项目将Sequence<T>拆分成Sequence<Sequence<T>>?

Kotlin:用N个项目将Sequence<T>拆分成Sequence<Sequence<T>>?
EN

Stack Overflow用户
提问于 2022-07-12 09:32:38
回答 2查看 227关注 0票数 3

如何迭代“取(N)”--得到一个序列,每个内部序列都有下一个N元素?

我正在用Kotlin写一个高负荷的应用程序。

我有成千上万的条目要插入到数据库中。

我想把它们分批,比如说1000。

所以我创建了一个循环:

代码语言:javascript
复制
    val itemsSeq = itemsList.iterator().asSequence()
    while (true) {
        log.debug("Taking $BATCH_SIZE from $itemsSeq")
        val batchSeq = itemsSeq.take(BATCH_SIZE)
        val squareBatch = applySomething(batchSeq, something)
            ?: break
    }

    fun applySomething(batch: Sequence<Item>, something: Something) {
        /* Fully consumes batch. Bulk-loads from DB by IDs, applies, bulk-saves. */ 
    }

我认为take()将推进itemsSeq,下一次对take()的调用将给出从第10项开始的itemsSeq序列“视图”。

但是有了这个代码,我得到了:

代码语言:javascript
复制
DEBUG Taking 10 from kotlin.sequences.ConstrainedOnceSequence@53fe15ff
Exception in thread "main" java.lang.IllegalStateException: This sequence can be consumed only once.
    at kotlin.sequences.ConstrainedOnceSequence.iterator(SequencesJVM.kt:23)
    at kotlin.sequences.TakeSequence$iterator$1.<init>(Sequences.kt:411)
    at kotlin.sequences.TakeSequence.iterator(Sequences.kt:409)

因此,take()似乎再次“打开”了itemsSeq,而它只能使用一次。

作为解决办法,我可以使用chunked()

代码语言:javascript
复制
public fun <T> Sequence<T>.chunked(size: Int): Sequence<List<T>> {

但是我更喜欢创建List,而不是Sequence

我要找的是take() chunked()**.**之间的东西

Kotlin SDK里有这样的东西吗?

我可以创建自己的sequence { ... },但为了可读性,我更喜欢内置的东西。

EN

回答 2

Stack Overflow用户

发布于 2022-07-12 10:26:04

有一种方法可以通过将序列交给Iterator来构造序列,请参阅序列

给定迭代器函数,构造一个序列,通过该函数提供的iterator返回值。这些值是延迟计算的,并且序列可能是无限的。

它封装在一个扩展函数中,如下所示:

代码语言:javascript
复制
fun <T> Iterable<T>.toValuesThroughIteratorSequence(): Sequence<T> {
  val iterator = this.iterator()
  return Sequence { iterator }
}

快速测试:

代码语言:javascript
复制
data class Test(val id: Int)

val itemsList = List(45) { Test(it) }
val batchSize = 10

val repetitions = itemsList.size.div(batchSize) + 1
val itemsSeq = itemsList.toValuesThroughIteratorSequence()

(0 until repetitions).forEach { index ->
  val batchSeq = itemsSeq.take(batchSize)
  println("Batch no. $index:   " + batchSeq.map { it.id.toString().padStart(2, ' ') }.joinToString(" "))
}

输出:

代码语言:javascript
复制
Batch no. 0:    0  1  2  3  4  5  6  7  8  9
Batch no. 1:   10 11 12 13 14 15 16 17 18 19
Batch no. 2:   20 21 22 23 24 25 26 27 28 29
Batch no. 3:   30 31 32 33 34 35 36 37 38 39
Batch no. 4:   40 41 42 43 44
票数 3
EN

Stack Overflow用户

发布于 2022-07-12 16:24:37

背景

首先,我们需要意识到可以迭代的对象和表示“活动”或已经在运行的迭代过程的对象之间有很大的区别。第一组指Iterable (so ListSet和所有其他集合)、ArrayFlow等。第二组主要是Iterator或旧Java Enumeration。在读取文件指针或数据库表与数据库游标时,这种差异也可以与文件指针进行比较。

Sequence 属于第一组。Sequence对象并不表示一个活动的、已经开始的迭代,而是一组元素。这些元素可以懒洋洋地产生,序列可以具有无界大小,并且通常在内部使用迭代器工作,但在概念上序列本身并不是迭代器。

如果我们观察一下关于序列的文档,它就会清楚地将它们与Iterable而不是Iterator进行比较。所有构建序列的标准方法,比如:sequenceOf()sequence {}Iterable.asSequence(),都会产生每次迭代它们时都会返回相同的项目列表的序列。Iterator.asSequence()也遵循这种模式,但是由于它不能两次重新生成相同的项,所以它被有意保护以避免多次迭代:

代码语言:javascript
复制
public fun <T> Iterator<T>.asSequence(): Sequence<T> = Sequence { this }.constrainOnce()

问题

您最初使用take()的尝试没有奏效,因为这是对序列的误用。我们期望对同一个序列对象的后续take()调用将产生完全相同的项(通常),而不是下一个项。类似地,正如我们预期的那样,列表上的多个take()调用总是产生相同的项,每次从一开始就开始。

更具体的说,您的错误是由上面的constrainOnce()引起的。当我们在一个序列上多次调用take()时,它必须从一开始就重新启动,但是如果它是从迭代器创建的,那么它就不能这样做,所以Iterator.asSequence()明确不允许这样做。

简单解

要解决这个问题,可以跳过@lukas.j建议的constrainOnce()部件。这个解决方案很好,因为stdlib已经提供了像Sequence.take()这样的工具,所以如果小心使用,这是最容易实现的,而且它只是起作用。

然而,我个人认为这是一种解决办法,因为结果序列的行为与序列不同。它更像是类固醇上的迭代器,而不是真正的序列。在与现有运算符或第三方代码一起使用此序列时,您需要小心,因为这种序列的工作方式可能与他们预期的不同,因此可能会得到不正确的结果。

高级解决方案

我们可以跟踪您最初使用后续take()调用的尝试。在这种情况下,我们的对象用于活动迭代,因此它不再是一个适当的序列,而是一个迭代器。我们在stdlib中错过的唯一方法是使用单个块创建子迭代器。我们可以自己执行:

代码语言:javascript
复制
fun main() {
    val list = (0 .. 25).toList()

    val iter = list.iterator()
    while (iter.hasNext()) {
        val chunk = iter.limited(10)
        println(chunk.asSequence().toList())
    }
}

fun <T> Iterator<T>.limited(n: Int): Iterator<T> = object : Iterator<T> {
    var left = n
    val iterator = this@limited

    override fun next(): T {
        if (left == 0)
            throw NoSuchElementException()
        left--
        return iterator.next()
    }

    override fun hasNext(): Boolean {
        return left > 0 && iterator.hasNext()
    }
}

我把它命名为limited(),因为take()建议我们从迭代器中读取条目。相反,我们只在提供的迭代器之上创建另一个迭代器。

当然,序列比迭代器更容易使用,解决这个问题的典型方法是使用chunked()。使用上面的limited(),实现chunkedAsSequences()非常简单

代码语言:javascript
复制
fun main() {
    val list = (0 .. 25).toList()

    list.asSequence()
        .chunkedAsSequences(10)
        .forEach { println(it.toList()) }
}

fun <T> Sequence<T>.chunkedAsSequences(size: Int): Sequence<Sequence<T>> = sequence {
    val iter = iterator()
    while (iter.hasNext()) {
        val chunk = iter.limited(size)
        yield(chunk.asSequence())
        chunk.forEach {} // required if chunk was not fully consumed
    }
}

也请注意,有一个棘手的情况,块没有完全消耗。chunkedAsSequences()不受此情况的影响。以前的更简单的解决方案则不然。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72950095

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档