首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Scala节流器改变每条消息之间的时间

使用Scala节流器改变每条消息之间的时间
EN

Stack Overflow用户
提问于 2021-01-30 23:12:43
回答 1查看 46关注 0票数 2

我正在学习Akka,并编写了以下代码来限制发送到TradeAction参与者的消息数量。每秒最多发送3条消息。是否可以修改节流器,以便设置每条消息之间的时间?例如,message1和message2之间延迟2秒,message2和message3之间延迟1秒。

代码语言:javascript
复制
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import systemconfig.ActorSystemConfig

import scala.concurrent.duration.DurationInt

case class Action(side: String)

object PlaceTrade {

  val actorSystem = ActorSystem("firstActorSystem")
  println(actorSystem.name)

  object TradeAction {
    def props(action : String) = Props(new TradeAction(action))
  }

  class TradeAction(actorName: String) extends Actor {
    override def receive: Receive = {
      case "Buy" => {
        val r = requests.get("http://www.google.com")
        println("r status code is "+r.statusCode)
        println("Buy")
        println("")
      }
      case "Sell" => {
        val r = requests.get("http://www.google.com")
        println("r status code is "+r.statusCode)
        println("Sell")
        println("")
      }
      case _ =>
    }
  }

  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(ac: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 3.second)
    .to(Sink.actorRef(ac, NotUsed))
    .run()

  def main(args: Array[String]): Unit = {
    val tradeAction = actorSystem.actorOf(TradeAction.props("TradeAction"))
    val throttler = getThrottler(tradeAction)

    val l = List(Action("Buy"),Action("Buy"),Action("Buy"),Action("Sell"))
    l.foreach(action => {
      throttler ! action.side
    })
  }


}
EN

Stack Overflow用户

回答已采纳

发布于 2021-01-31 00:33:50

您可以使用delayWith来实现这一点,它允许一个(可能有状态的)方法来定义此元素延迟多长时间(无需重新排序元素),例如:

代码语言:javascript
复制
import akka.stream.scaladsl.{ DelayStrategy, DelayOverflowStrategy }
import scala.concurrent.duration.FiniteDuration

def decliningDelay(): DelayStrategy[Any] =
  new DelayStrategy {
    var nextDelaySeconds: Option[Int] = None

    def nextDelay(elem: Any): FiniteDuration =
      nextDelaySeconds match {
        case None =>
          nextDelaySeconds = 2
          0.seconds
        case Some(delay) if delay > 0 =>
          nextDelaySeconds = delay - 1
          delay.seconds
        case _ => 0.seconds
      }
  }

def getThrottler(ac: ActorRef): ActorRef =
  Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .delayWith(() => decliningDelay(), DelayOverflowStrategy.backpressure)
    .to(Sink.actorRef(ac, NotUsed))
    .run()

您可以将delayWiththrottle结合使用;我可能会将delayWith放在throttle之前。

票数 3
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65969576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档