2015-04-22 56 views
7

我正在使用Spray应用程序中的ask模式调用Actor,并将结果作为HTTP响应返回。我将演员的失败映射到自定义错误代码。在发生故障时解决Akka期货问题

val authActor = context.actorOf(Props[AuthenticationActor]) 

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user => 
    complete(StatusCodes.OK, user) 
} 

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = { 
onComplete(f) { 
    case Success(value: T) => cb(value) 
    case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage) 
    case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.") 
    //In reality this returns a custom error object. 
} 
} 

这正常工作时authActor发送故障,但如果authActor抛出一个异常,没有任何反应,直到问超时完成。例如:

override def receive: Receive = { 
    case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token") 
} 

我知道阿卡文档说

不同,需要发送一个失败消息发送者例外完成未来。 这不会自动完成当一个actor在处理消息时抛出一个异常。

但是鉴于我使用了很多的喷射布线演员和服务演员之间的接口,我宁愿不用每个孩子演员的try/catch包装接收部分。是否有更好的方法来实现自动处理子actor中的异常,并在发生异常时立即解决未来?

编辑:这是我目前的解决方案。然而,为每个孩子演员做这件事都很麻烦。

override def receive: Receive = { 
case default => 
    try { 
    default match { 
     case _ => throw new ServiceException("")//Actual code would go here 
    } 
    } 
    catch { 
    case se: ServiceException => 
     logger.error("Service error raised:", se) 
     sender ! Failure(se) 
    case ex: Exception => 
     sender ! Failure(ex) 
     throw ex 
    } 
} 

这样,如果它是一个预期的错误(即ServiceException),它通过创建失败来处理。如果它是意外的,它会立即返回一个失败,以便未来得到解决,但是会抛出异常,因此它仍然可以由SupervisorStrategy处理。

+0

那么......抛出异常之前发送失败消息。 –

+0

这就是我不想做的 - 我说**我宁愿不用每个孩子演员的try/catch **包装接收部分。这是一个玩具的例子,我完全有可能不控制抛出异常的位置。 –

+0

呃......你知道...... resilent分布式系统的基本路线之一就是“明确地制造错误”。想想可能发生的各种错误...让他们明确。如果你可以有“TypeSafe”错误...更好。 –

回答

13

如果你想要的方式来提供自动在意外的异常的情况下返回给发送者的响应发送,那么这样的事情可能会为你工作:

trait FailurePropatingActor extends Actor{ 
    override def preRestart(reason:Throwable, message:Option[Any]){ 
    super.preRestart(reason, message) 
    sender() ! Status.Failure(reason) 
    } 
} 

我们覆盖preRestart和传播失败作为Status.Failure返回发件人,这将导致上游Future失败。另外,在这里拨打super.preRestart这很重要,因为这是孩子停止发生的地方。在演员使用,这看起来是这样的:

case class GetElement(list:List[Int], index:Int) 
class MySimpleActor extends FailurePropatingActor { 
    def receive = { 
    case GetElement(list, i) => 
     val result = list(i) 
     sender() ! result 
    } 
} 

如果我打电话给这个演员的一个实例,像这样:

import akka.pattern.ask 
import concurrent.duration._ 

val system = ActorSystem("test") 
import system.dispatcher 
implicit val timeout = Timeout(2 seconds) 
val ref = system.actorOf(Props[MySimpleActor]) 
val fut = ref ? GetElement(List(1,2,3), 6) 

fut onComplete{ 
    case util.Success(result) => 
    println(s"success: $result") 

    case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}") 
    ex.printStackTrace()  
}  

然后它会正常打我Failure块。现在,当Futures不参与扩展该特质的演员时,该基本特征中的代码运行良好,就像这里的简单演员。但是如果您使用Future s,则需要小心,因为在Future中发生的例外情况不会导致演员重新启动,并且在preRestart中,sender()的呼叫也不会返回正确的参考,因为演员已经进入下一条消息。像这样的演员表明问题:

class MyBadFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 

    def receive = { 
    case GetElement(list, i) => 
     val orig = sender() 
     val fut = Future{ 
     val result = list(i) 
     orig ! result 
     }  
    } 
} 

如果我们在前面的测试代码中使用这个演员,我们总是在失败的情况下超时。为了减轻这一点,你需要管期货的结果返回给像这样发件人:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut pipeTo sender() 
    } 
} 

在这种特殊情况下,演员本身不会重新启动,因为它并没有遇到未捕获的异常。现在,如果你的演员所需要的未来后做一些额外的处理,你可以管回自我,明确当你得到一个Status.Failure失败:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2 

    case Status.Failure(ex) => 
     throw ex 
    } 
} 

如果这种行为变得普遍,你可以把它提供给任何演员需要它是这样的:

trait StatusFailureHandling{ me:Actor => 
    def failureHandling:Receive = { 
    case Status.Failure(ex) => 
     throw ex  
    } 
} 

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = myReceive orElse failureHandling 

    def myReceive:Receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2   
    } 
} 
+1

这看起来不错,谢谢! –