问题:
我想重复地从某个第三方库提供的fs2.Stream中获取一些批,从而将客户端从fs2.Stream本身抽象出来,并在它们准备好后立即给它们简单的F[List[Int]]批。
尝试:我尝试使用fs2.Stream::take并运行了一些示例。
I.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。我预计从0到1000的所有批次都会被打印出来。
在这里保持简单一点是
II.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
_ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()
r.unsafeRunSync()这种行为与I完全相同。打印List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。
问题:
给定一个fs2.Stream[IO, Int],如何提供一个在计算时遍历由流提供的连续批的效果IO[List[Int]]?
发布于 2021-08-17 12:58:09
好吧,您不能有一个表示多个批的IO[List[X]],IO将是一个单独的批处理。
你能做的最好的事情就是这样:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]也就是说,您的用户将给您一个操作来执行每一批,您将给他们一个IO,它将阻塞当前的光纤,直到使用该函数消耗整个流为止。
实现这一职能的简单方法是:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
getStreamFromThirdParty
.chunkN(n = ChunkSize)
.evalMap(chunk => process(chunk.toList))
.compile
.drainhttps://stackoverflow.com/questions/68817411
复制相似问题