2012-12-06 93 views
2

我想使用标准scala.actors包为Scala设计一个调度员 - 工作者演员模式。斯卡拉调度员 - 工人演员模式

调度员接收来自java.util.concurrent.LinkedBlockingQueue的工作并将其发送给工作者参与者进行处理。当所有的工作都完成后,调度员应该告诉每个工人要退出,然后它也应该退出。这里是我想出的代码,但它挂起,当所有的工作都做完了(我认为是即将在调度队列'GiveMeWork消息):

import java.util.concurrent.LinkedBlockingQueue 
import scala.actors.Actor 

object Dispatcher 
extends Actor { 
    println("Dispatcher created") 

    def act() { 
    val workers = (1 to 4).map(id => (new Worker(id)).start()) 

    loop { 
     react { 
     case 'GiveMeWork => 
      // println("Worker asked for work") 
      val (time, i) = workQueue.take() 
      if (time == 0) { 
      println("Quitting time") 
      workers.foreach(_ !? 0L) 
      } else { 
      println("Arrival at dispatcher: i: " + i + " dispatch time: " + 
        time + ", elapsed: " + (System.nanoTime() - time)) 
      sender ! time 
      } 
     case 'Quit => 
      println("Told to quit") 
      sender ! 'OffDuty 
      exit() 
     } 
    } 
    } 
} 

class Worker(id: Int) 
extends Actor { 
    println("Worker(" + id + ") created") 
    var jobs = 0 

    def act() { 
    Dispatcher ! 'GiveMeWork 

    loop { 
     react { 
     case time: Long => 
      if (time == 0) { 
      println("Worker(" + id + ") completed " + jobs + " jobs") 
      sender ! 'OffDuty 
      exit() 
      } else { 
      println("Arrival at worker(" + id + "): dispatch time: " + 
        time + ", elapsed: " + (System.nanoTime() - time)) 
      Thread.sleep(id) 
      jobs += 1 
      Dispatcher ! 'GiveMeWork 
      } 
     } 
    } 
    } 
} 

val workQueue = new LinkedBlockingQueue[(Long, Int)](1000) 

Dispatcher.start() 

for (i <- 0 until 5000) { 
    Thread.sleep(1) 
    workQueue.put((System.nanoTime(), i)) 
} 

workQueue.put((0L, 0)) 

println("Telling Dispatcher to quit") 
Dispatcher !? 'Quit 

回答

3

有一场比赛:

val (time, i) = workQueue.take() 

所有的工作都完成了,包括workQueue.put((0L, 0)),所以它会一直等下去。

同时使用不同类型的并发是一个坏主意。

调度员可以告知任务源有关任务限制:

import scala.actors.{Actor, OutputChannel} 
import scala.collection.mutable.Queue 

case class Task(time: Long, i: Int) 
case object GiveMeWork 
case object Quit 
case object OffDuty 

object Dispatcher extends Actor { 
    println("Dispatcher created") 

    def act() { 
    val workers = (1 to 4).map(id => (new Worker(id)).start()) 
    val waitingWorkers = Queue[OutputChannel[Any]](workers: _*) 
    val tasks = Queue[Task]() 
    var workSender: Option[OutputChannel[Any]] = None 

    loop { 
     react { 
     case GiveMeWork => 
      if (!tasks.isEmpty) sender ! tasks.dequeue() 
      else waitingWorkers enqueue sender 

      workSender map { _ ! GiveMeWork } 
      workSender = None 
     case t: Task => 
      if (!waitingWorkers.isEmpty) waitingWorkers.dequeue() ! t 
      else tasks enqueue t 

      if (tasks.length < 1000) sender ! GiveMeWork 
      else workSender = Some(sender) 
     case Quit => 
      println("Told to quit") 
      workers.foreach{ _ ! Quit } 
      sender ! OffDuty 
      exit() 
     } 
    } 
    } 
} 

class Worker(id: Int) 
extends Actor { 
    var jobs = 0 

    def act() { 
    loop { 
     react { 
     case t: Task => 
      Thread.sleep(id) 
      jobs += 1 
      Dispatcher ! GiveMeWork 
     case Quit => 
      println("Worker(" + id + ") completed " + jobs + " jobs") 
      sender ! OffDuty 
      exit() 
     } 
    } 
    } 
} 

Dispatcher.start() 

for (i <- 0 until 5000) { 
    Thread.sleep(1) 
    Dispatcher !? Task(System.nanoTime(), i) 
} 

println("Telling Dispatcher to quit") 
Dispatcher !? Quit 
+0

我不喜欢使用的LinkedBlockingQueue,但我需要限制的等待执行的最大进程数。当我在实际代码中使用此模式时,由于生成工作的方法(目录递归下降),我得到了内存不足错误,添加的文件比它们可以处理的快。有关如何纯粹与演员合作的建议? – Ralph

+0

@Ralph我已经更新了我的答案。 – senia