2014-10-10 148 views
5

我试图在没有使用Queue的情况下在scala中实现Producer Consumer程序。因为我认为Actor已经实现了“邮件队列”或其他的东西,所以再次编写代码将是多余的。在scala中实现Producer Consumer的正确方法是什么

我试图纯粹在Actor中编写程序。 下面是一个多生产者多个消费者程序。 生产者睡一会儿,模拟做某事。消费者根本不睡觉。

但是我不知道怎么关闭程序,如果我不添加主管演员监控消费者,以及使用“等待”(代码中的超级类)

是一种承诺对象无论如何摆脱他们?

import akka.actor.Actor.Receive 
import akka.actor._ 
import akka.routing._; 
import akka.util._ 

import scala.concurrent.{Await, Promise} 
import scala.concurrent.duration._ 

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    def receive = { 
    case _ => 
     while (true) { 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     } 
    } 
} 

class Consumer(supervisor : ActorRef)(val name:String) extends Actor { 

    var counter = 0 

    def receive = { 
    case s => 
     counter += 1 
     println("%s eat food produced by %s" format (name,s)) 

     if (counter >= 10) { 
     println("%s is full" format name) 

     context.stop(self) 
     supervisor ! 1 
     } 
    } 
} 

class Supervisor(p:Promise[String]) extends Actor { 

    var r = 3 

    def receive = { 
    case _ => 
     r -= 1 
     if (0 == r) { 
     println("All consumer stopped") 
     context.stop(self) 
     p success ("Good") 
     } 
    } 

} 

object Try3 { 

    def work(): Unit = { 
    val system = ActorSystem("sys1") 
    val nProducer = 5; 
    val nConsumer = 3; 
    val p = Promise[String] 
    val supervisor = system.actorOf(Props(new Supervisor(p))); 
    val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i)))) 
    val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer))) 
    val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i)))) 

    arrProducer foreach (_ ! "start") 

    Await.result(p.future,Duration.Inf) 
    println("great!") 
    system.shutdown 
    } 

    def main(args:Array[String]): Unit = { 
    work() 
    } 
} 

接收功能产生类有一个问题,它不会被关闭,因为它虽然没有打破的条件。

我能想到的唯一方法是“向制作者本身发送信息”。 我不知道这是实现这种请求的正常方式吗?

下面是修改代码:

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    // original implementation: 
    // def receive = { 
    // case _ => 
    // while (true){ 
    //  val sleepTime = scala.util.Random.nextInt(1000) 
    //  Thread.sleep(sleepTime) 
    //  println("Producer %s send food" format name) 
    //  pool ! name 
    // } 
    // } 

    case object Loop; 

    def receive = { 
    case _ => 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     self ! Loop //send message to itself 
    } 
} 

不管我的实现,什么是Scala实现生产者消费者程序的正确方法,与演员或未来/无极?

回答

2

你不应该在actor中阻塞(在你的情况下是Thread.sleep,while循环)。在演员内部阻止从所有演员中使用的线程池中获取线程。即使像你这样少量的制作者也会让ActorSystem中的所有actor脱离线程并使其无法使用。

取而代之的是使用Scheduler来定期在Producer中定期发送meesage。

override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    import context.dispatcher 
    context.system.scheduler.schedule(
    initialDelay = 0.seconds, 
    interval = 1.second, 
    receiver = pool, 
    message = name 
) 
} 
+1

谢谢@Martynas什么。你解决了我的“循环”问题。我仍在寻求生产者 - 消费者优雅实施的答案。 – worldterminator 2014-10-10 07:26:22

0

你想想实现Terminator演员:)

object Terminator { 
    case class WatchMe(ref: ActorRef) 
} 
class Terminator extends Actor { 
    var consumers: Map[ActorRef, ActorRef] = Map() 

    def receive = { 
     case WatchMe(ref) => { 
     consumers += ref -> ref 
     context.watch(ref) 
     } 
     case Terminated(ref) => { 
     context.unwatch(ref) 
     consumers.get(ref).foreach { ref -> ref ! PoisonPill } 
     consumers -= ref 
     //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy 
     if(consumers.size == 0) { 
      delegate() 
      context.stop(self) 
     } 
     } 
    } 
} 
相关问题