2013-01-14 187 views
3

我正在尝试在akka演员之间建立一个消息处理过程,以表示主给工作人员一份工作,并密切关注它。我的问题是Akka演员工作与期货交接

  1. 就是我下面提出一个合理的方法,并
  2. 即使它不是 ,我想知道如何才能正确地与期货的组成 来完成,对于为了我未来的教育。

我想的过程是这样的

1)主机发送工作,工人与ask。它期望在5秒内得到答复,否则认为工人失去了机会,并且必须再次进入竞标。

import context.dispatcher 
implicit val timeout = Timeout(5 seconds) 
val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[Future[WorkCompleted]] 

2A)如果工人没有在5秒内做出反应,我想师傅将自身发送一个消息,说来重新分配工作。

self ! WorkAllocationFailed(work, worker) 

2B)如果工人没有响应,那么它给了我们一个未来[WorkCompleted。我想等待未来完成,例如2分钟。

3a)的如果未来[WorkCompleted]未能在超时时间内完成,然后重新分配工作

self ! WorkFailed(work, worker) 

3b)的如果未来[WorkCompleted]成功然后收集结果

我我试图创建这个逻辑,但是我陷入了嵌套的onComplete混乱,我不知道如何处理Future [WorkCompleted]的超时。我尝试阅读Akka 2.10 Futures docs,但无法找出解决方案。

回答

2

我恩德雷的答案达成一致 - 都非常好点。

如何:

1)安排一个消息给自己的超时时间(使用system.scheduler.scheduleOnce

2)发送工作邮件给工人使用常规tell

3A)如完成的工作在超时消息之前回来,取消预定的工作并使用步骤1和2重新分配工作

3b)如果完成的工作在超时消息th要么忽略它,要么取消重新分配的工作。

一个地方的期货可能对工人有帮助,尤其是在工作需要很长时间或阻碍工作时。工作人员可以使用未来来完成工作并保持可用于处理更多传入消息,例如取消工作。

2

一般的想法,即你有一位将工作交给工作人员池的主人是一种健全的模式。

另一方面,当系统的所有部分都已经是演员时,我不建议使用Futures。不要使用提问来提交工作,你可以通过告诉发送它。主人然后可以定期检查超时工作并重新提交。

另外,在actor的主体中调用onComplete是非常危险的,因为它在一个潜在的不同线程上执行。与演员沟通的安全方式是通过信息传递。如果你有一个未来,并且你希望在未来完成时在演员中完成一些事情,那么最好使用管道模式。

你的代码段中还有一个小错误。如果你的工人演员与WorkCompleted回复,那么这就是你真正想要的行:

val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[WorkCompleted]