我有一个ActorA从输入流中读取消息并将消息发送给一组ActorB's。当ActorA到达输入流的末尾时,它清除其资源,向ActorB广播完成消息,并关闭它。我如何确定所有演员都收到广播消息
我有大约12个ActorB发送消息给一组ActorC's。当ActorB从ActorA收到完成消息后,它清除其资源并关闭自己,除了最后幸存的ActorB在ActorC关闭之前向ActorC广播完成消息之外。
我有大约24个ActorC发送消息给单个ActorD。与ActorB类似,当每个ActorC获得完成消息时,它清除其资源并关闭自身,除了最后幸存的向ActorD发送完成消息的ActorC之外。
当ActorD获取完成消息时,它清除其资源并关闭它。
最初我有ActorB和ActorC在收到它时立即传播完成消息,但这可能会导致ActorC在所有ActorB完成处理队列之前关闭;同样,在ActorC完成队列处理之前,ActorD可能会关闭。
我的解决方案是使用在所述ActorB的
class ActorB(private val actorCRouter: ActorRef,
private val actorCount: AtomicInteger) extends Actor {
private val init = {
actorCount.incrementAndGet()
()
}
def receive = {
case Done => {
if(actorCount.decrementAndGet() == 0) {
actorCRouter ! Broadcast(Done)
}
// clean up resources
context.stop(self)
}
}
}
ActorC之间共享使用类似的代码的AtomicInteger,每个ActorC共享一个的AtomicInteger。
目前所有的角色都是在一个web服务方法中初始化的,下游ActorRef被传入上游角色的构造函数中。
是否有一个首选的方法来做到这一点,例如:使用调用Akka方法而不是AtomicInteger?
编辑:我考虑以下作为一个可能的替代:当一个演员接收Done消息它设置接收超时时间为5秒(该程序将需要一个多小时的运行,所以延迟清理/几秒钟关机不会影响性能);当演员获得ReceiveTimeout时,它向下游演员广播完成,清理并关闭。 (对于ActorB和ActorC路由器使用的是SmallestMailboxRouter)
class ActorB(private val actorCRouter: ActorRef) extends Actor {
def receive = {
case Done => {
context.setReceiveTimeout(Duration.create(5, SECONDS))
}
case ReceiveTimeout => {
actorCRouter ! Broadcast(Done)
// clean up resources
context.stop(self)
}
}
}
有这样的演员之间的任何形式的共享状态是一个非常糟糕的主意。单独的生命周期关注并确保您的参与者坚持单一责任原则。这样做会更容易。 – Ryan 2014-09-29 00:35:24