2016-08-01 57 views
2

我正在寻找关于在Spark 1.6 ML库中实现并行LBFGS和OWLQN算法的文档。优化器LBFGS OWLQN实现

我找到了这个网页为1.6:http://spark.apache.org/docs/1.6.1/ml-advanced.html但没有关于并行化

为2.0:http://spark.apache.org/docs/2.0.0/ml-advanced.html但仍一无所知并行

最后,我阅读代码[链接1]。方法

def train(dataset: DataFrame): LogisticRegressionModel 

似乎使用微风优化模型,但我没有找到火花功能被称为(地图,flatMap,减少,...)。

在代码[link2]中,map用于计算被减少为计算梯度的子梯度。

由于

+0

链接1:https://github.com/apache/spark/blob/v1.6.0/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala – DLSpark

+0

链接2:HTTPS:/ /github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala – DLSpark

回答

2

总之,火花采用微风LBFGS和OWLQN优化算法,并提供它们各自具有一种方法来计算成本函数的梯度在每次迭代。

例如,Spark的LogisticRegression类利用LogisticCostFun类,它扩展了Breeze的DiffFunction特征。这种成本函数类实现有签名calculate抽象方法:

override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) 

的计算方法利用LogisticAggregator类,它是真正的工作就完成了。聚合类定义了两个重要的方法:

def add(instance: Instance): this.type // the gradient update equation is hard-coded here 
def merge(other: LogisticAggregator): this.type // just adds other's gradient to the current gradient 

add方法定义了一种方法添加的单个数据点之后更新梯度,和合并的方法定义了一种方法,以两个分开的聚合结合起来。这个类被运送给执行者,用于聚合每个数据分区,然后用于将所有分区聚合器合并为一个聚合器。最终的聚合器实例保存当前迭代的累积梯度,并用于更新驱动器节点上的系数。这个过程是通过调用在LogisticCostFun类控制treeAggregate

val logisticAggregator = { 
    val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) 
    val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) 

    instances.treeAggregate(
    new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean) 
)(seqOp, combOp) 
} 

可以多一点简单地认为它是这样的:微风实现了几个不同的优化方法(例如LBFGS,OWLQN),只需要你告诉优化方法如何计算梯度。 Spark告诉Breeze算法如何通过LogisticCostFun类来计算梯度。 LogisticCostFun只是说要发送一个LogisticAggregator实例到每个分区,收集渐变更新,然后将它们发回并合并到驱动程序中。

+0

非常感谢。这正是我所期待的。 – DLSpark