首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Fs2.流坚持两次

Fs2.流坚持两次
EN

Stack Overflow用户
提问于 2021-08-17 12:30:01
回答 1查看 114关注 0票数 0

问题:

我想重复地从某个第三方库提供的fs2.Stream中获取一些批,从而将客户端从fs2.Stream本身抽象出来,并在它们准备好后立即给它们简单的F[List[Int]]批。

尝试:我尝试使用fs2.Stream::take并运行了一些示例。

I.

代码语言:javascript
运行
复制
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()

r.unsafeRunSync()

它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。我预计从01000的所有批次都会被打印出来。

在这里保持简单一点是

II.

代码语言:javascript
运行
复制
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
  _ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()

r.unsafeRunSync()

这种行为与I完全相同。打印List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。

问题:

给定一个fs2.Stream[IO, Int],如何提供一个在计算时遍历由流提供的连续批的效果IO[List[Int]]

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-17 12:58:09

好吧,您不能有一个表示多个批的IO[List[X]]IO将是一个单独的批处理。

你能做的最好的事情就是这样:

代码语言:javascript
运行
复制
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]

也就是说,您的用户将给您一个操作来执行每一批,您将给他们一个IO,它将阻塞当前的光纤,直到使用该函数消耗整个流为止。

实现这一职能的简单方法是:

代码语言:javascript
运行
复制
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
  getStreamFromThirdParty
    .chunkN(n = ChunkSize)
    .evalMap(chunk => process(chunk.toList))
    .compile
    .drain
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68817411

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档