2012-06-21 21 views
1

我正在使用akka演员负载生成器应用程序。该应用程序可以正常运行数百万次的请求,但是当使用无限循环将负载增加到超过1000万个请求或运行负载一段时间(而不是请求数量)时,应用程序将挂起。下面是一个简化的实现,它只是打印正在测试的命令。我还注意到,统计信息没有记录,或者当时间结束时,应用程序不会关闭。我使用调度程序每30秒钟转储统计信息,并在2小时后关闭应用程序。以小的时间间隔进行测试,并且没有看到“统计”和“关机”消息的处理。Akka Actor应用程序挂在大批量下

任何想法可能导致应用程序挂起?

import akka.actor._ 
import akka.util.duration._ 
import akka.routing.RoundRobinRouter 
import com.test.redload.util.CommandGenerator 
import org.apache.log4j.Logger 
import akka.util.Duration 

class LoadWorker extends Actor { 
    val log = Logger.getLogger(this.getClass().getName()) 
    def receive = { 
    case "PUT" => sender ! PUT 
    case "GET" => sender ! GET 
    case "DELETE" => sender ! DELETE 
    case "POST" => sender ! POST 
    case "HEAD" => sender ! HEAD 
    } 
    def PUT():Boolean = {println("PUT");return true} 
    def GET():Boolean = {println("GET");return true} 
    def DELETE():Boolean = {println("DELETE");return true} 
    def POST():Boolean = {println("POST");return true} 
    def HEAD():Boolean = {println("HEAD");return true} 
} 

class LoadGenerator(nrOfWorkers:Int, noOfMessages:Int) extends Actor { 

    val log = Logger.getLogger(this.getClass().getName()) 
    val start:Long = System.currentTimeMillis 
    var noOfMessageRcvd:Int = 0 
    val r = new CommandGenerator// <- is basically are list implementation that iterates and returns the next command 
    r.addCommand("PUT",5) r.addCommand("GET",2) r.addCommand("DELETE",2) 
    r.addCommand("POST",2) r.addCommand("HEAD",1) r.addCommand("LBRPOP",1) 

    val loadRouter = context.actorOf(Props[LoadWorker].withRouter(RoundRobinRouter(nrOfWorkers)),name ="loadRouter") 

    def receive = { 
    case "start" => { 
     if(noOfMessages > 1) { 
     for(i <- 0 until noOfMessages) loadRouter ! r.getRandomCommand() 
     } else { 
     log.info("Time bound Load run..") 
     //for(i <- 0 until 10000000) { //<- For any number greater than few millions that app hangs after few messages 
     while(true){loadRouter ! r.getRandomCommand() //<- with while loop the app hangs as soon as it begins 
     } 
     } 
    } 
    case true => { 
      noOfMessageRcvd +=1 
      if(noOfMessages == noOfMessageRcvd){ 
      self ! "shutdown" 
      } 
    } 
    case "stats" => { 
      logStats() 
    } 
    case "shutdown" => { 
      logStats() 
      log.info("Shutting Down!") 
      context.system.shutdown() 
    } 
    } 
    def logStats(){ 
    var duration = (System.currentTimeMillis - start)/1000 
    if(duration > 0) { 
     log.info(noOfMessageRcvd+" messages processed in "+duration +" seconds " 
     + "at "+ noOfMessageRcvd/duration +" TPS") 
    } else { 
     log.info(noOfMessageRcvd+" messages processed in less than a second ") 
    } 
    } 
} 

object RedLoad extends App{ 
    val log = Logger.getLogger(this.getClass().getName()) 
    val system = ActorSystem("LoadGeneratorApp"); 
    // -1 is if we want to run for a period of time and > 1 the run will end after the messages are procesed 
    val lg = system.actorOf(Props(new LoadGenerator(100,-1)),"LG") 
    //Log the stats every 30 seconds 
    system.scheduler.schedule(0 seconds,30 seconds,lg,"stats") 
    //Shutdown the load run after 2 hours, if no of message is > -1 then it will shutdown after 
    //all messages are processed 
    system.scheduler.scheduleOnce(2 hours,lg,"shutdown") 
    lg ! "start" 
    log.info("Started..") 
} 
+0

请粘贴您的演员配置。我怀疑线程挨饿。 – opyate

+0

我已经添加到JVM的唯一配置是“-Dactors.corePoolSize = 20” – mzafer

回答

2

那么,你的演员一次不能处理一条以上的信息,而你只是把它设置为永远忙于发送信息。按设计工作。删除无限循环并将​​批量消息发送到loadRouter,并发送自己的继续消息以继续发送更多消息。

case SendBatch => 
    (1 to batchSize) foreach { router ! message } 
    self ! SendBatch 
+0

你能解释一个关于“发送你自己的连续消息”的更多信息吗? – mzafer

+0

好,澄清。 –

+0

我可以对脚本做一些改进,但你的回答解决了这个问题。谢谢 – mzafer