2
我想使用标准scala.actors包为Scala设计一个调度员 - 工作者演员模式。斯卡拉调度员 - 工人演员模式
调度员接收来自java.util.concurrent.LinkedBlockingQueue
的工作并将其发送给工作者参与者进行处理。当所有的工作都完成后,调度员应该告诉每个工人要退出,然后它也应该退出。这里是我想出的代码,但它挂起,当所有的工作都做完了(我认为是即将在调度队列'GiveMeWork
消息):
import java.util.concurrent.LinkedBlockingQueue
import scala.actors.Actor
object Dispatcher
extends Actor {
println("Dispatcher created")
def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())
loop {
react {
case 'GiveMeWork =>
// println("Worker asked for work")
val (time, i) = workQueue.take()
if (time == 0) {
println("Quitting time")
workers.foreach(_ !? 0L)
} else {
println("Arrival at dispatcher: i: " + i + " dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
sender ! time
}
case 'Quit =>
println("Told to quit")
sender ! 'OffDuty
exit()
}
}
}
}
class Worker(id: Int)
extends Actor {
println("Worker(" + id + ") created")
var jobs = 0
def act() {
Dispatcher ! 'GiveMeWork
loop {
react {
case time: Long =>
if (time == 0) {
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! 'OffDuty
exit()
} else {
println("Arrival at worker(" + id + "): dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
Thread.sleep(id)
jobs += 1
Dispatcher ! 'GiveMeWork
}
}
}
}
}
val workQueue = new LinkedBlockingQueue[(Long, Int)](1000)
Dispatcher.start()
for (i <- 0 until 5000) {
Thread.sleep(1)
workQueue.put((System.nanoTime(), i))
}
workQueue.put((0L, 0))
println("Telling Dispatcher to quit")
Dispatcher !? 'Quit
我不喜欢使用的LinkedBlockingQueue,但我需要限制的等待执行的最大进程数。当我在实际代码中使用此模式时,由于生成工作的方法(目录递归下降),我得到了内存不足错误,添加的文件比它们可以处理的快。有关如何纯粹与演员合作的建议? – Ralph
@Ralph我已经更新了我的答案。 – senia