2011-08-16 60 views
8

我试图在akka Actor中获得容错行为。我正在研究一些依赖于系统中的Actor的代码,这些代码可用于长时间的处理。我发现我的处理在几个小时后停止(应该大约需要10个小时),而且没有太多的事情发生。我相信我的演员并没有从例外中恢复过来。如何设置akka Actor容错?

我需要做些什么才能让演员永久重启?我希望这可以从该文档http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

我与阿卡1.1.3和Scala工作2.9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

此代码设置了5名演员负载均衡器的后面,只是打印出是整数做发送给他们,除了他们抛出偶数的例外来模拟故障。整数0到200被发送给这些Actor。我预计奇数会得到输出,但偶数次出现偶发错误后,似乎所有事情都会关闭。运行该代码与SBT结果的输出:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

我觉得这里发生的事情是,5名演员开始,前5个偶数它们倒闭,他们都没有得到重启。

如何更改此代码以使Actor能够从异常中恢复?

我预计这实际上会打印出1到200之间的所有奇数。我认为每个参与者都会在偶数上失败,但在异常情况下用完整的邮箱重新启动。我期望看到来自preRestart和postRestart的println。在这个代码示例中需要配置什么才能使这些事情发生?

以下是关于阿卡和演员的一些额外假设,可能会导致我的误解。我假设一个Actor可以配置一个Supervisor或一个faultHandler,以便它在接收期间被抛出时会重新启动并继续可用。我假设发送给演员的消息在接收期间如果引发异常将会丢失。我假设抛出异常的actor上的preRestart()和postRestart()将被调用。

的示例代码代表什么,我试图做的,并基于Why is my Dispatching on Actors scaled down in Akka?

**另一个代码示例**

下面是另一个代码示例,更简单。我开始了一个在偶数上抛出异常的演员。没有负载平衡器或其他东西。我试图打印出关于演员的信息。在消息发送给演员并监视发生的事情后,我等待一分钟退出程序。

我希望这会打印出奇数,但它看起来像演员坐在邮箱中的邮件。

我是否有OneForOneStrategy设置错误?我需要将演员链接到某个东西吗?这种配置是否从根本上误导了我? Dispatcher是否需要设置容错功能?我可以搞乱Dispatcher中的线程吗?

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

我越来越像输出:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

回答

5

问题是我在用我的akka​​.conf文件。我使用参考1.1.3 akka.conf文件,除了配置事件处理程序的行外。

矿(破碎的):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

参考1.1.3(正常工作的那个):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 

随着我的事件处理程序的配置行,演员重新启动不会发生。参考1.1.3行重新启动发生奇妙。

我做了这个变化根据这些说明http://akka.io/docs/akka/1.1.3/general/slf4j.html

所以,在该页摆脱的建议,并要回1.1.3参考akka.conf我能得到容错演员。

1

我相信你的问题终止的消息发出后,你是不是想保持你的异步应用活着,所以主线程退出,并把它放在一起。

+0

如果我在main()的末尾添加一个Trhead.sleep(100000),我会得到:'[info]正在从应用程序类路径载入配置[akka.conf]。 演员数量:5 演员:0收到:1 演员:4收到:3 演员:1收到:7 演员:1收到:9'和输出暂停,我没有等待应用程序退出,但30-40秒后没有任何东西。另外,如果我排除故障,数字将在2秒内打印得非常快。 –