我已经使用akka的演员模型构建了分布式流式机器学习模型。训练模型是通过向训练者发送一个训练实例(训练数据)来异步发生的。对这些数据进行训练会占用计算时间并改变参与者的状态。控制消耗大量内存的Akka中演员的产卵
当前我正在使用历史数据来训练模型。我想运行一系列不同配置的模型,这些模型在相同的数据上进行训练,并了解不同的集合度量标准是如何变化的。本质上,这是一个非常复杂的模拟,我用Thread.sleep(1)和表示计算时间和状态的数据数组进行了仿真。
implicit val as = ActorSystem()
case object Report
case class Model(dataSize: Int) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Thread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val trainingData = Array.fill(5000)(Random.nextInt)
val dataSizeParams = (1 to 500)
接下来我用一个for循环改变参数(由dataSizeParams阵列代表)
for {
param <- dataSizeParams
} {
// make model with params
val model = Model(param)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
}
for循环肯定是错误的方法做我想要做的事。它并行启动所有不同的模型。当dataSizeParams在1到500的范围内时,它工作的很好,但如果我把它提高到某个高度,我的Models EACH开始占用大量的内存。我想到的是下面的代码。基本上我有一个模型主人,他可以根据他收到的运行信息的数量来控制一次运行的模型数量。每个型号现在包含这个主演员的参考,当他做处理送他一个消息:
// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run
case class Model(dataSize: Int, master: ActorRef) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Tread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
master ! ImDone
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val master: ActorRef = actor(new Act {
var paramRuns = dataSizeParams.toIterator
become {
case Run => {
if (paramRuns.hasNext) {
val model = Model(paramRuns.next(), self)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
} else {
println("No more to run")
context.stop(self)
}
}
case ImDone => {
self ! Run
}
}
})
master ! Run
没有与主代码的任何问题(我可以看到)。我对一次产生的模型数量有着严格的控制,但我觉得我错过了一个更简单/干净/现成的方法。另外我想知道是否有任何简单的方法来节制一次运行的模型数量,比如说查看系统的CPU和内存使用情况。
正在使用一个路由器和一个演员池不是你的选择?模型构建器池将被限制为由您设置的大小,并且每个池模型构建器都不会移动到邮箱中的下一条消息,直到完成构建其当前模型,从而更好地控制内存和CPU。 – cmbaxter
这很有趣。我们尝试不使用Akka手动管理线程,最终管理演员......我遇到了类似的问题。看来for for循环创建演员是一个非常糟糕的主意。 –