在Akka中,我尝试使用pipeTo,而不是在使用?创建的未来响应上使用onComplete,因为这应该是首选模式。然而,当未来超时时,我似乎不会收到任何Throwable或Failure。如果在使用pipeTo时发生超时,我应该期望在我的执行元中收到什么?当抛出不同的异常时会怎样呢?示例代码:
class Simple(otherActor : ActorRef) extends Actor{
def receive = {
case "some_msg" => {
val implicit timeout = Timeout(1 seconds)
val response = otherActor ? "hello"
response pipeTo self
}
// case ??? // How do I handle timeouts?
}
}
如果在发生超时时没有自动发送消息,我应该如何使用pipeTo处理超时?
发布于 2013-12-19 15:42:12
未来的失败将作为包含异常的akka.actor.Status.Failure
消息发送。超时的例外是akka.pattern.AskTimeoutException
。
发布于 2013-12-19 21:02:05
如果您的示例与您的实际代码非常匹配,那么我不确定pipeTo
是否就是您想要的。将消息传回给你自己,对我来说,没有太多意义,对于一个参与者向另一个参与者发送消息,然后等待响应的情况,有更好的解决方案。不过,首先让我们谈谈pipeTo
。我认为何时使用pipeTo
的一个很好的例子是,如果你有三个角色,A,B和C。A向B发送一条消息,B再向C发送一条消息,C的响应应该在B先对其执行其他操作后返回给A。在该示例中,您可以在B中执行以下操作:
val fut = actorC ? someMessage
fut map(someMapFunc) pipeTo sender
在这里,如果您使用诸如onComplete
之类的东西并响应该回调中的sender
,那么pipeTo
函数有助于防止您意外地关闭可变的sender
变量。
现在,对于您的情况,如果您只想让A与B对话,然后等待B的响应(并处理潜在的超时),您可以尝试如下所示:
class ActorA extends Actor{
import context._
val myB = context.actorOf(Props[ActorB])
def receive = {
case msg =>
myB ! msg
setReceiveTimeout(2 seconds)
become(waitingForResponse)
}
def waitingForResponse:Receive = {
case ReceiveTimeout =>
println("got a receive timeout")
cancelReceiveTimeout
case response =>
println("got my response back")
cancelReceiveTimeout
}
def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined)
}
在本例中,A以默认的receive
partial函数开始。当它接收到一条消息时,它会向B发送另一条消息,为从B接收响应设置接收超时,然后将它的receive
函数切换为特定于等待来自B的响应的函数。在这个新的接收函数中,我可以及时从B获得响应,也可以获得一个ReceiveTimeout
,这表明我没有及时收到响应。在这两种情况下,我都会取消接收超时,因为它会重复。
这非常简单,但我只是想展示一种在两个参与者之间来回切换的方法,就像你的例子所展示的那样。
https://stackoverflow.com/questions/20673091
复制相似问题