2012-07-07 73 views
8

我有一个未知数量的作业要由已知(当然)数量的演员执行。演员完成工作后,初始工作数量可能会增加。也就是说,一个演员在完成任务后可能会添加一个新的工作来执行。如何知道演员是否空闲

我处理这个问题的方式是让每个角色在完成作业时发回一条消息给主人,不仅是执行结果,还有一个'标志'表示演员现在处于空闲状态。主人有一个工作队列和一个空闲的演员队列,每当一个演员发送一个'工作完成信息'时,主人将检查是否还有其他东西要做那个演员......等等等等,直到作业的队列是空的,空闲的队列已满......那时我关闭了系统。这里没有太多的监督,所以我觉得我没有正确地做...

我不使用路由器,因为我找不到一种方法来查询闲置的演员路由器,所以我的问题是:

什么是“正确的”方式来处理我上面在Akka中描述的情况?

回答

7

你应该看看Akka's routing capabilites.SmallestMailboxRouter可能是你在找什么。

作为替代方案,您可以根据需要创建演员,即对于每个任务,都会动态创建新演员。中央演员跟踪所有当前活动的演员。工人角色完成后,它会自行发送一个PoisonPill并通知主人有关其关闭(主动或通过标准Terminate消息,Akka将发送给监督角色)。一旦没有更多的主动角色,即没有更多的任务,控制器角色就会关闭系统。看完后评论

增加: 看看的SmallestMailboxLike,Scala的特质通过SmallestMailboxRouter混合来源。警告:您应该具备Scala的基本知识。但是,这通常是一个好主意,无论如何,如果你想用阿卡...的方法isProcessingMessage(ActorRef)可以理解为isNotIdle(ActorRef)

// Returns true if the actor is currently processing a message. 
// It will always return false for remote actors. 
// Method is exposed to subclasses to be able to implement custom 
// routers based on mailbox and actor internal state. 
protected def isProcessingMessage(a: ActorRef): Boolean = a match { 
    case x: LocalActorRef ? 
    val cell = x.underlying 
    cell.mailbox.isScheduled && cell.currentMessage != null 
    case _ ? false 
} 

// Returns true if the actor currently has any pending messages 
// in the mailbox, i.e. the mailbox is not empty. 
// It will always return false for remote actors. 
// Method is exposed to subclasses to be able to implement custom 
// routers based on mailbox and actor internal state. 
protected def hasMessages(a: ActorRef): Boolean = a match { 
    case x: LocalActorRef ? x.underlying.mailbox.hasMessages 
    case _    ? false 
} 
+0

我经历了Akka的路由,这就是为什么我决定不使用路由器(也许我应该写我自己的),因为我无法查询路由器关于工人的闲置状态(也许我错了,你能指出我在哪里看看?) – DodoTheDeadDoo 2012-07-07 20:17:19

+0

不是故意按下输入...无论如何,我会研究毒丸解决方案,就像我现在正在做的一样,我不杀死一个工人,我只是将它设置为空闲状态,以便它可以被重新利用......当我没有更多的任务时而且所有的工人都比我知道我已经完成了。 – DodoTheDeadDoo 2012-07-07 20:21:24

+0

所以我会写我自己的路由器:) – DodoTheDeadDoo 2012-07-11 14:24:00

1

另一种策略可以使用BalancingDispatcher和RoundRobinRouter(作为一个演员“池” )。从阿卡文档:

BalancingDispatcher 
# This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. 




# All the actors share a single Mailbox that they get their messages from. 

It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. 

# Sharability: Actors of the same type only 

# Mailboxes: Any, creates one for all Actors 

# Use cases: Work-sharing 

定义您的调度员application.conf或编程方式加载它在启动时。

private final static Config akkaConfig = ConfigFactory.parseString(

      "my-dispatcher.type = BalancingDispatcher \n" + 
      "my-dispatcher.executor = fork-join-executor \n" + 
      "my-dispatcher.fork-join-executor.parallelism-min = 8 \n" + 
      "my-dispatcher.fork-join-executor.parallelism-factor = 3.0 \n" + 
      "my-dispatcher.fork-join-executor.parallelism-max = 64 " 
); 

然后为路由定义路由器和调度器。

getContext().actorOf(new Props(MyActor.class).withRouter(new RoundRobinRouter(10)).withDispatcher("my-dispatcher"), "myActor"); 

那么路由器将简单地去“分配”的消息,并分发器将运行选定的演员(和它实现工作窃取以及)

-1

平衡调度程序将只使用一个邮箱的所有创建的演员都是用BalancingDispatcher创建的。所以它会让你的工作变得简单。