2012-03-05 46 views
21

我已经实现了一个使用Akka及其Java API UntypedActor的Actor系统。其中,一名演员(类型A)根据需要动态地启动其他演员(类型B),使用getContext().actorOf(...);。那些B演员会做一些A不再真正关心的计算。但是我想知道:是否有必要在完成时清理B型的演员?如果是这样,怎么样?Akka:完成动态创建的演员时需要清理吗?

  • 通过让B参与者在完成时调用getContext().stop(getSelf())
  • 通过让B演员完成时调用getSelf().tell(Actors.poisonPill());? [这是我现在使用的]。
  • 什么都不做?
  • ...??

这篇文档不清楚,或者我忽略了它。我对Scala有一些基本的了解,但Akka的源代码并不完全是入门级的东西......

回答

23

你所描述的是每个“请求”(在A的上下文中定义)创建的单一目的actor,它处理一系列事件,然后完成,对吗?这是绝对好的,你是正确的关闭这些:如果你不这样做,他们会积累一段时间,你会遇到内存泄漏。做到这一点的最佳方式是你提到的第一个可能性(最直接的),但第二个也是可以的。

有点背景:演员在他们的父母身上注册以便识别(例如在远程处理中需要但在其他地方),并且这种注册使他们免于被垃圾收集。 OTOH,每个父母都有权访问其创建的孩子,因此不会自动终止(即通过Akka),这意味着需要在用户代码中明确关闭。

+0

http:// stackoverflow。com/questions/23066264/can-wrapping-akka-actors-in-class-actors-caused-memory-leaks < - 相关问题 – 2014-04-14 17:33:55

-3

默认情况下,参与者不会消耗太多内存。如果应用程序打算稍后使用actor b,则可以使它们保持活动状态。如果没有,你可以通过poisonpill关闭它们。只要你的演员没有资源,留下一个演员应该没问题。

+4

但是,正如罗兰指出的那样,演员不会被垃圾收集,因此会随着时间累积=>内存泄漏。 – 2012-03-06 16:21:53

0

除了Roland Kuhn的回答,不是为每个请求创建一个新角色,而是创建一组预定义的共享相同调度器的角色,或者使用将请求分发给角色池的路由器。

Balancing Pool Router,例如,可以让你有一组固定的一特定类型的份额的行动者相同的邮箱的:

akka.actor.deployment { 
    /parent/router9 { 
    router = balancing-pool 
    nr-of-instances = 5 
    } 
} 

阅读文档上dispatchersrouting为进一步的细节。

0

我正在剖析(visualvm)来自AKKA文档的示例集群应用程序之一,并且我在每个GC期间看到清理每个请求演员的垃圾收集。无法完全理解使用后显式杀死actor的建议。我的演员系统和演员由SPRING IOC容器管理,我使用spring extension直接演员制作人来创建演员。 “聚合器”actor正在每个GC上收集垃圾,我确实监视了可视VM中的实例数量。

@Component 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class StatsService extends AbstractActor { 

    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); 

    @Autowired 
    private ActorSystem actorSystem; 

    private ActorRef workerRouter; 

    @Override 
    public void preStart() throws Exception { 
     System.out.println("Creating Router" + this.getClass().getCanonicalName()); 
     workerRouter = getContext().actorOf(SPRING_PRO.get(actorSystem) 
      .props("statsWorker").withRouter(new FromConfig()), "workerRouter"); 
     super.preStart(); 
    } 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
      .match(StatsJob.class, job -> !job.getText().isEmpty(), job -> { 
       final String[] words = job.getText().split(" "); 
       final ActorRef replyTo = sender(); 
       final ActorRef aggregator = getContext().actorOf(SPRING_PRO.get(actorSystem) 
        .props("statsAggregator", words.length, replyTo)); 

       for (final String word : words) { 
        workerRouter.tell(new ConsistentHashableEnvelope(word, word), 
         aggregator); 
       } 
      }) 
      .build(); 
    } 
}