我试图在akka Actor中获得容错行为。我正在研究一些依赖于系统中的Actor的代码,这些代码可用于长时间的处理。我发现我的处理在几个小时后停止(应该大约需要10个小时),而且没有太多的事情发生。我相信我的演员并没有从例外中恢复过来。如何设置akka Actor容错?
我需要做些什么才能让演员永久重启?我希望这可以从该文档http://akka.io/docs/akka/1.1.3/scala/fault-tolerance
我与阿卡1.1.3和Scala工作2.9
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._
object TestActor {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
.setCorePoolSize(100)
.setMaxPoolSize(100)
.build
}
class TestActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.dispatcher = TestActor.dispatcher
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure")
println("Actor: " + name + " Received: " + num)
//Thread.sleep(100)
}
}
override def postStop(){
println("TestActor post Stop ")
}
//callback method for restart handling
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
//callback method for restart handling
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val testActors: List[ActorRef]
val seq = new CyclicIterator[ActorRef](testActors)
}
trait TestActorManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
val testActors: List[ActorRef]
override def preStart = testActors foreach { self.startLink(_) }
override def postStop = { System.out.println("postStop") }
}
object FaultTest {
def main(args : Array[String]) : Unit = {
println("starting FaultTest.main()")
val numOfActors = 5
val supervisor = actorOf(
new TestActorManager with CyclicLoadBalancing {
val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
}
)
supervisor.start();
println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length)
val testActor = Actor.registry.actorsFor(classOf[TestActor]).head
(1 until 200 toList) foreach { testActor ! _ }
}
}
此代码设置了5名演员负载均衡器的后面,只是打印出是整数做发送给他们,除了他们抛出偶数的例外来模拟故障。整数0到200被发送给这些Actor。我预计奇数会得到输出,但偶数次出现偶发错误后,似乎所有事情都会关闭。运行该代码与SBT结果的输出:
[info] Running FaultTest
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info]
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM
我觉得这里发生的事情是,5名演员开始,前5个偶数它们倒闭,他们都没有得到重启。
如何更改此代码以使Actor能够从异常中恢复?
我预计这实际上会打印出1到200之间的所有奇数。我认为每个参与者都会在偶数上失败,但在异常情况下用完整的邮箱重新启动。我期望看到来自preRestart和postRestart的println。在这个代码示例中需要配置什么才能使这些事情发生?
以下是关于阿卡和演员的一些额外假设,可能会导致我的误解。我假设一个Actor可以配置一个Supervisor或一个faultHandler,以便它在接收期间被抛出时会重新启动并继续可用。我假设发送给演员的消息在接收期间如果引发异常将会丢失。我假设抛出异常的actor上的preRestart()和postRestart()将被调用。
的示例代码代表什么,我试图做的,并基于Why is my Dispatching on Actors scaled down in Akka?
**另一个代码示例**
下面是另一个代码示例,更简单。我开始了一个在偶数上抛出异常的演员。没有负载平衡器或其他东西。我试图打印出关于演员的信息。在消息发送给演员并监视发生的事情后,我等待一分钟退出程序。
我希望这会打印出奇数,但它看起来像演员坐在邮箱中的邮件。
我是否有OneForOneStrategy设置错误?我需要将演员链接到某个东西吗?这种配置是否从根本上误导了我? Dispatcher是否需要设置容错功能?我可以搞乱Dispatcher中的线程吗?
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._
class SingleActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure, where does this get logged?")
println("Actor: " + name + " Received: " + num)
}
}
override def postStop(){
println("TestActor post Stop ")
}
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
object TestSingleActor{
def main(args : Array[String]) : Unit = {
println("starting TestSingleActor.main()")
val testActor = Actor.actorOf(new SingleActor(1)).start()
println("number of actors: " + registry.actors.size)
printAllActorsInfo
(1 until 20 toList) foreach { testActor ! _ }
for(i <- 1 until 120){
Thread.sleep(500)
printAllActorsInfo
}
}
def printAllActorsInfo() ={
registry.actors.foreach((a) =>
println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
.format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
}
}
我越来越像输出:
[info] Running TestSingleActor
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
... 117 more of these lines repeted ...
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
[info] == run ==
[success] Successful.
[info]
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
如果我在main()的末尾添加一个Trhead.sleep(100000),我会得到:'[info]正在从应用程序类路径载入配置[akka.conf]。 演员数量:5 演员:0收到:1 演员:4收到:3 演员:1收到:7 演员:1收到:9'和输出暂停,我没有等待应用程序退出,但30-40秒后没有任何东西。另外,如果我排除故障,数字将在2秒内打印得非常快。 –