2014-01-10 44 views
3

我已经使用akka的演员模型构建了分布式流式机器学习模型。训练模型是通过向训练者发送一个训练实例(训练数据)来异步发生的。对这些数据进行训练会占用计算时间并改变参与者的状态。控制消耗大量内存的Akka中演员的产卵

当前我正在使用历史数据来训练模型。我想运行一系列不同配置的模型,这些模型在相同的数据上进行训练,并了解不同的集合度量标准是如何变化的。本质上,这是一个非常复杂的模拟,我用Thread.sleep(1)和表示计算时间和状态的数据数组进行了仿真。

implicit val as = ActorSystem() 

case object Report 

case class Model(dataSize: Int) { 
    val modelActor: ActorRef = actor(new Act { 
    val data = Array.fill(dataSize)(0) 
    become { 
     case trainingData: Int => { 
     // Screw with the state of the actor and pretend that it takes time 
     Thread.sleep(1) 
     data(Math.abs(Random.nextInt % dataSize)) == trainingData 
     } 
     case Report => { 
      println(s"Finished $dataSize") 
      context.stop(self) 
     } 
     } 
    }) 

    def train(trainingInstance: Int) = modelActor ! trainingInstance 

    def report: Unit = modelActor ! Report 
} 

val trainingData = Array.fill(5000)(Random.nextInt) 

val dataSizeParams = (1 to 500) 

接下来我用一个for循环改变参数(由dataSizeParams阵列代表)

for { 
    param <- dataSizeParams 
} { 
    // make model with params 
    val model = Model(param) 
    for { 
    trainingInstance <- trainingData 
    } { 
    model.train(trainingInstance) 
    } 
    model.report 
} 

for循环肯定是错误的方法做我想要做的事。它并行启动所有不同的模型。当dataSizeParams在1到500的范围内时,它工作的很好,但如果我把它提高到某个高度,我的Models EACH开始占用大量的内存。我想到的是下面的代码。基本上我有一个模型主人,他可以根据他收到的运行信息的数量来控制一次运行的模型数量。每个型号现在包含这个主演员的参考,当他做处理送他一个消息:

// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor 
case object ImDone 
case object Run 

case class Model(dataSize: Int, master: ActorRef) { 
    val modelActor: ActorRef = actor(new Act { 
    val data = Array.fill(dataSize)(0) 
    become { 
     case trainingData: Int => { 
     // Screw with the state of the actor and pretend that it takes time 
     Tread.sleep(1) 
     data(Math.abs(Random.nextInt % dataSize)) == trainingData 
     } 
     case Report => { 
      println(s"Finished $dataSize") 
      master ! ImDone 
      context.stop(self) 
     } 
     } 
    }) 

    def train(trainingInstance: Int) = modelActor ! trainingInstance 

    def report: Unit = modelActor ! Report 
} 

val master: ActorRef = actor(new Act { 
    var paramRuns = dataSizeParams.toIterator 
    become { 
    case Run => { 
     if (paramRuns.hasNext) { 
     val model = Model(paramRuns.next(), self) 
     for { 
      trainingInstance <- trainingData 
     } { 
      model.train(trainingInstance) 
     } 
     model.report 
     } else { 
     println("No more to run") 
     context.stop(self) 
     } 
    } 
    case ImDone => { 
     self ! Run 
    } 
    } 
}) 

master ! Run 

没有与主代码的任何问题(我可以看到)。我对一次产生的模型数量有着严格的控制,但我觉得我错过了一个更简单/干净/现成的方法。另外我想知道是否有任何简单的方法来节制一次运行的模型数量,比如说查看系统的CPU和内存使用情况。

+0

正在使用一个路由器和一个演员池不是你的选择?模型构建器池将被限制为由您设置的大小,并且每个池模型构建器都不会移动到邮箱中的下一条消息,直到完成构建其当前模型,从而更好地控制内存和CPU。 – cmbaxter

+0

这很有趣。我们尝试不使用Akka手动管理线程,最终管理演员......我遇到了类似的问题。看来for for循环创建演员是一个非常糟糕的主意。 –

回答

2

您正在寻找拉动模式的工作。我在阿卡开发者强烈建议这个博客帖子:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

我们使用的这种变体上的阿卡的集群功能外,以避免流氓并发。通过让工作者演员拉动而不是让主管工作,您可以通过简单地限制工作人员的数量来优雅地控制工作量(以及CPU和内存使用情况)。

与纯粹的路由器相比,它有一些优势:跟踪失败更容易(如该文章中所述),并且工作不会在邮箱中衰弱(可能会丢失)。另外,如果您使用远程处理,我建议您而不是在消息中发送大量数据。触发时让工作人员节点自己从另一个来源抽取数据。我们使用S3。