我希望有一个源,它在给定的时间间隔内评估一个函数并发出它的输出。作为一种解决办法,我可以使用Source.queue
+ offer
来实现它,但是还没有找到一种更干净的方法。理想情况下我会有这样的东西
def myFunction() = .... // function with side-effects
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick
有什么想法吗?
发布于 2017-04-24 14:33:14
也许最干净的方法是使用map
Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
发布于 2017-04-24 14:34:25
我想,throttle
是你所需要的。完全可运行的示例,其中Source
应用于迭代,它使用了next()
中的函数
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0
def myFunction(): Int = {
i = i + 1
i
}
import scala.collection.immutable.Iterable
val x: Iterable[Int] = new Iterable[Int] {
override def iterator: Iterator[Int] =
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = myFunction()
}
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)
throttle
参数:节流源,每秒1元素,最大突发= 1,在发出消息之前暂停以满足节流率(这是Shaping
的功能)。
https://stackoverflow.com/questions/43590101
复制相似问题