2014-09-28 54 views
2

我有一个ActorA从输入流中读取消息并将消息发送给一组ActorB's。当ActorA到达输入流的末尾时,它清除其资源,向ActorB广播完成消息,并关闭它。我如何确定所有演员都收到广播消息

我有大约12个ActorB发送消息给一组ActorC's。当ActorB从ActorA收到完成消息后,它清除其资源并关闭自己,除了最后幸存的ActorB在ActorC关闭之前向ActorC广播完成消息之外。

我有大约24个ActorC发送消息给单个ActorD。与ActorB类似,当每个ActorC获得完成消息时,它清除其资源并关闭自身,除了最后幸存的向ActorD发送完成消息的ActorC之外。

当ActorD获取完成消息时,它清除其资源并关闭它。

最初我有ActorB和ActorC在收到它时立即传播完成消息,但这可能会导致ActorC在所有ActorB完成处理队列之前关闭;同样,在ActorC完成队列处理之前,ActorD可能会关闭。

我的解决方案是使用在所述ActorB的

class ActorB(private val actorCRouter: ActorRef, 
      private val actorCount: AtomicInteger) extends Actor { 
    private val init = { 
    actorCount.incrementAndGet() 
    () 
    } 

    def receive = { 
    case Done => { 
     if(actorCount.decrementAndGet() == 0) { 
     actorCRouter ! Broadcast(Done) 
     } 
     // clean up resources 
     context.stop(self) 
    } 
    } 
} 

ActorC之间共享使用类似的代码的AtomicInteger,每个ActorC共享一个的AtomicInteger。

目前所有的角色都是在一个web服务方法中初始化的,下游ActorRef被传入上游角色的构造函数中。

是否有一个首选的方法来做到这一点,例如:使用调用Akka方法而不是AtomicInteger?


编辑:我考虑以下作为一个可能的替代:当一个演员接收Done消息它设置接收超时时间为5秒(该程序将需要一个多小时的运行,所以延迟清理/几秒钟关机不会影响性能);当演员获得ReceiveTimeout时,它向下游演员广播完成,清理并关闭。 (对于ActorB和ActorC路由器使用的是SmallestMailboxRouter)

class ActorB(private val actorCRouter: ActorRef) extends Actor { 

    def receive = { 
    case Done => { 
     context.setReceiveTimeout(Duration.create(5, SECONDS)) 
    } 

    case ReceiveTimeout => { 
     actorCRouter ! Broadcast(Done) 
     // clean up resources 
     context.stop(self) 
    } 
    } 
} 
+1

有这样的演员之间的任何形式的共享状态是一个非常糟糕的主意。单独的生命周期关注并确保您的参与者坚持单一责任原则。这样做会更容易。 – Ryan 2014-09-29 00:35:24

回答

1

共享actorCount相关行动者之间是不是做一件好事。 Actor只能使用自己的状态来处理消息。 如何为ActorB类型的演员拥有ActorBCompletionHanlder actor?所有ActorB都将引用ActorBCompletionHanlder actor。每当ActorB收到完成消息,它可以做必要的清理,并简单地将完成的消息传递给ActorBCompletionHanlder。 ActorBCompletionHanlder将维护状态变量以保持计数。每次它收到完成的消息,它可以简单地更新计数器。因为这只是这个actor的状态变量,所以不需要它是原子的,这样就不需要任何明确的锁定。 ActorBCompletionHanlder一旦收到上次完成的消息,就会向ActorC发送完成的消息。 这种方式共享activeCount不是演员之间,但只能由ActorBCompletionHanlder管理。同样的事情可以重复其他类型。

A-> B的 - > BCompletionHanlder - > C'S - > CCompletionHandler - > d

其他办法可以是对演员的埃维相关的一组一个监测演员。在监视器上使用watch api和儿童终止事件,您可以选择决定在收到上次完成的消息后应该执行的操作。

val child = context.actorOf(Props[ChildActor]) 
    context.watch(child) 

    case Terminated(child) => { 
     log.info(child + " Child actor terminated") 
    }