2013-11-23 135 views
0

我需要帮助的东西是粗体。阿卡匹配失败,并恢复

我有一个演员,飞多个喷HttpRequests,请求分页和演员确保它将结果顺序写入数据库(序列是重要的,以恢复爬网)。我解释这一点是因为我现在不想探索其他并发模式。演员需要从超时恢复而不重新启动。

在我的演员

我有以下几点:

  case f : Failure => { 
       system.log.error("faiure") 
       system.log.error(s"$f") 
       system.shutdown() 
      } 
      case f : AskTimeoutException => { 
       system.log.error("faiure") 
       system.log.error(s"$f") 
       system.shutdown() 
      } 
      case msg @ _ => { 

       system.log.error("Unexpected message in harvest") 
       system.log.error(s"${msg}") 
       system.shutdown() 
      } 

,但我不能正确地匹配:

[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Unexpected message in harvest 
[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Failure(akka.pattern.AskTimeoutException: Timed out) 

我分派如下所示:

abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val reqFut = (System.requester ? CrawlerRequest(node,Get(node.url))).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse])) 
    reqFut pipeTo self 
    } 


class CrawlerRequester extends Actor { 
    import context._ 
    val throttler = context.actorOf(Props(classOf[TimerBasedThrottler],System.Config.request_rate),"throttler") 
    throttler ! SetTarget(Some(IO(Http).actorRef)) 

    def receive : Receive = { 
    case CrawlerRequest(type_,request) => { 
     throttler forward request 
    } 
    } 
} 

一旦找到正确的匹配方式,就是无论如何,我可以得到我的手在超时发生的CrawlerRequest?它包含一些我需要弄清楚如何恢复的状态。

回答

1

需要键入失败案例类的完整路径(或者我猜想导入它)。

case f: akka.actor.Status.Failure => { 
       system.log.error("faiure") 
       system.log.error(s"${f.cause}") 
       system.shutdown() 
      } 

只是留下了与超时相关的请求。在点请求调度中似乎需要具有自定义失败处理程序的地图和管道。现在看看它。

以下将超时时间拖入角色。

case class CrawlerRequestTimeout(request: CrawlerRequest) 
abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val req = CrawlerRequest(node,Get(node.url)) 
    val reqFut = (System.requester ? req).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse])) 

    reqFut onFailure { 
     case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req) 
    } 
    reqFut pipeTo self 
    } 
} 

火柴的:

case timeout : CrawlerRequestTimeout => { 
       println("boom") 
       system.shutdown() 
      } 

需要寻找抑制例外,虽然的一种方式,它仍然射击。也许压制并不是真正的问题,需要验证。

不,抑制是一个问题,或者异常流入msg @ _,需要放入一个case类以吸收冗余失败消息。

好的,所以摆脱pipeto摆脱进入客户端演员的异常。这也是很多更容易阅读:d

abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val req = CrawlerRequest(node,Get(node.url)) 
    val reqFut = (System.requester ? req) 

    reqFut onFailure { 
     case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req) 
    } 
    reqFut onSuccess { 
     case r: HttpResponse => self ! CrawlerResponse(node,r) 
    } 
    } 
} 
3

如果使用pipeTo向通过tell发送消息作出响应会发生这种情况。

例如:

in actorA: actorB ! message 
in actorB: message => doStuff pipeTo sender 
in actorA: receives not 'scala.util.Failure', but 'akka.actor.Status.Failure' 

的附加逻辑在pipeTo是转变TryFailure成阿卡的演员Failureakka.actor.Status.Failure)。当你使用ask模式时,这可以正常工作,因为临时询问演员的句柄akka.actor.Status.Failure,但不适用于tell

希望这个简短的回答帮助:)

祝你好运!

0

如果我理解正确,您目前没有成功匹配AskTimeoutException

如果是这样,你应该匹配case Failure(AskTimeoutException) => ...而不是case f : AskTimeoutException => ...