2017-06-22 63 views
0

我有100个线程,每次只需要处理12个线程,但不会超过这个线程。这些线程完成后,其他12个线程必须被处理,但是它只处理前12个线程,然后在那之后终止。
这里是我的逻辑:Akka线程调整

class AkkaProcessing extends Actor { 
    def receive = { 
case message: List[Any] => 
var meterName = message(0) // It Contains only 12 threads , it process them and terminates. Am unable to get remaining threads 

val sqlContext = message(1).asInstanceOf[SQLContext] 
val FlagDF = message(2).asInstanceOf[DataFrame] 
     { 

       All the business logic here 
      } 

     context.system.shutdown() 
    } 
    } 
} 
object Processing { 
    def main(args: Array[String]) = { 
    val rawBuff = new ArrayBuffer[Any]() 
    val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem 
    val actor = actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)), "my-Actor") 
    implicit val executionContext = actorSystem.dispatchers.lookup("akka.actor.my-dispatcher") 

    for (i <- 0 until meter_list.length) { 

    var meterName = meter_list(i)  // All 100 Meters here 

    rawBuff.append(meterName, sqlContext, FlagDF) 
    actor ! rawBuff.toList 
    } 
    } 
    } 

任何输入的高度赞赏

回答

0

我想你可能是最好的创建2种演员类型:消费类(在并行运行)和协调(这需要12个线程任务并将它们传递给消费者)。协调员将等待消费者完成,然后运行下一批。

看到这个答案的代码示例:Can Scala actors process multiple messages simultaneously?

如果做不到这一点,你可以只使用期货以类似的方式。