2010-11-11 54 views
7

试图获得如何根据actor而不是线程思考的句柄。我有点难倒以下用例:从线程模型转换到演员

考虑一个创建工作(例如,通过读取数据从文件)一个生产者进程的系统,和一个数字,消费工作的工作进程(例如解析数据并将其写入数据库)。工作的产生和消耗速度可能会有所不同,并且系统应该保持稳定。例如,如果工人无法跟上,制片人应该检测到这一点,并最终放慢速度或等待。

这是很容易使用线程来实现:

val producer:Iterator[Work] = createProducer() 
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) 
val workers = (0 until NUM_WORKERS) map { i => 
    new Thread() { 
    override def run() = { 
     while (true) { 
     try { 
      // take next unit of work, waiting if necessary 
      val work = queue.take() 
      process(work) 
     } 
     catch { 
      case e:InterruptedException => return 
     } 
     } 
    } 
    } 
} 

// start the workers 
workers.foreach(_.start()) 

while (producer.hasNext) { 
    val work = producer.next() 
    // add new unit of work, waiting if necessary 
    queue.put(work) 
} 

while (!queue.isEmpty) { 
    // wait until queue is drained 
    queue.wait() 
} 

// stop the workers 
workers.foreach(_.interrupt()) 

有没有什么不对的模型是,我已经成功地使用过。这个例子可能过于冗长,因为使用Executor或CompletionService可以很好地适应这个任务。但我喜欢演员的抽象,并且认为在很多情况下推理是比较容易的。有没有办法使用actors重写这个例子,特别是确保没有缓冲区溢出(例如完整的邮箱,丢弃的消息等)?

回答

3

因为参与者处理消息“脱机”(即消息的消耗与他们正在接收的消息不相关),所以很难看到你如何能够准确地模拟“生产者等待消费者赶上”。

我能想到的唯一的事情是,消费者要求从生产者演员的工作(使用reply):

case object MoreWorkPlease 
class Consumer(prod : Producer) extends Actor { 
    def act = { 
    prod ! MoreWorkPlease 
    loop { 
     react { 
     case Work(payload) => doStuff(payload); reply(MoreWorkPlease) 
     } 
    } 
    } 
} 

class Producer extends Actor { 
    def act = loop { 
    react { 
     case MoreWorkPlease => reply(Work(getNextItem)) 
    } 
    } 
} 

这是不完美的,当然,因为生产者不“读转发“,只有当消费者准备好时才能获得工作。用法可能类似于:

val prod = new Producer 
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start()) 
prod.start() 
+0

Hrm,这是我想到的一种解决方案。这可能就足够了,但我担心的是,如果工人超过生产者,那么缺少工作缓冲区会导致性能下降。 – toluju 2010-11-11 23:03:37

+0

@toluju - 开始让每个消费者都要求工作,并让生产者对这些消息不作出反应,但要接收它们,并在没有更多工作要做时将它们放入队列中。 (然后,一旦有工作,就可以将其划分到队列中的项目。) – 2010-11-12 00:08:18