2010-03-17 229 views
1

我想使用scala actors来并行化代码。这是我第一次与演员合作的真实代码,但我在C中使用Java Mulithreading和MPI方面有一些经验。但是我完全失去了。scala actors之间的依赖关系

欲实现的工作流是一个圆形的管道,并且可以描述为以下:

  • 每个工人演员具有到另一个基准,从而形成了一圈
  • 有一个协调演员,其可以通过发送消息StartWork()
  • 当工人接收StartWork()消息触发的计算,它在本地处理的一些东西,并发送DoWork(...) MESSA ge给它在圈子里的邻居。
  • 邻居做了一些其他的东西,并发送一个DoWork(...)消息到自己的邻居。
  • 这一直持续到最初的工作人员收到DoWork()消息。
  • 协调员可以发送一个GetResult()消息给初始worker并等待回复。

问题是协调器应该只在数据准备就绪时收到结果。 工作人员在回复GetResult()消息之前如何等待作业返回?

为了加速计算,任何员工可以随时收到StartWork()

这是我第一次尝试伪实现工人:

class Worker(neighbor: Worker, numWorkers: Int) { 
    var ready = Foo() 
    def act() { 
    case StartWork() => { 
     val someData = doStuff() 
     neighbor ! DoWork(someData, numWorkers-1) 
     } 
    case DoWork(resultData, remaining) => if(remaining == 0) { 
     ready = resultData 
     } else { 
     val someOtherData = doOtherStuff(resultData) 
     neighbor ! DoWork(someOtherData, remaining-1) 
     } 
    case GetResult() => reply(ready) 
    } 
} 

在协调方面:

worker ! StartWork() 
val result = worker !? GetResult() // should wait 

回答

3

首先,你显然需要对什么是单件的一些标识的工作,这样GetResult可以得到正确的结果。我想最明显的解决方案是让你的角色保持结果的Map任何等待干将Map

class Worker(neighbor: Worker, numWorkers: Int) { 
    var res: Map[Long, Result] = Map.empty 
    var gets: Map[Long, OutputChannel[Any]] = Map.empty 
    def act() { 
    ... 
    case DoWork(id, resultData, remaining) if remaining == 0 => 
     res += (id -> resultData) 
     gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready 
     gets -= id //clear out getter map now? 
    case GetResult(id) if res.isDefinedAt(d) => //result is ready 
     reply (res(id)) 
    case GetResult(id) => //no result ready 
     gets += (id -> sender) 
    } 
} 

注:的匹配条件使用if可以使信息处理有点清晰

+0

谢谢你的回答。我会尽快尝试。顺便说一句,我认为,如果在'=>'之后在这种情况下是正确的。在匹配参数时,我不会寻找警卫,但我希望根据价值有两种不同的行为。也许我应该使用两个不同的警卫的“案例”条目。 – paradigmatic

+0

哦,是的。所以是 - 我正在读'=>'在其他地方 –

1

一个替代办法是这样的:该作品

class Worker(neighbor: Worker, numWorkers: Int) { 
    var ready = Foo() 
    def act() { 
    case StartWork() => { 
     val someData = doStuff() 
     neighbor ! DoWork(someData, numWorkers-1) 
     } 
    case DoWork(resultData, remaining) => if(remaining == 0) { 
     ready = resultData 
     react { 
      case GetResult() => reply(ready) 
     } 
     } else { 
     val someOtherData = doOtherStuff(resultData) 
     neighbor ! DoWork(someOtherData, remaining-1) 
     } 
    } 
} 

后完成后,此工作人员将被卡住,直到收到消息GetResult。另一方面,协调员可以立即发送GetResult,因为它将保留在邮箱中,直到工作人员收到它。

+0

真的很好。我没有意识到可以嵌入反应块。但是,这不是我的问题的解决方案,因为(如果我理解正确),工作人员将被困在正在等待'GetResult()'的内部'react'中,并且将无法成为管道的一部分。 – paradigmatic

+0

@paradigmatic只有当结果准备就绪时,它才会停下来等待'GetResult',但我的观点确实表明您可以级联反应。 –