总之,火花采用微风LBFGS和OWLQN优化算法,并提供它们各自具有一种方法来计算成本函数的梯度在每次迭代。
例如,Spark的LogisticRegression
类利用LogisticCostFun
类,它扩展了Breeze的DiffFunction
特征。这种成本函数类实现有签名calculate
抽象方法:
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double])
的计算方法利用LogisticAggregator
类,它是真正的工作就完成了。聚合类定义了两个重要的方法:
def add(instance: Instance): this.type // the gradient update equation is hard-coded here
def merge(other: LogisticAggregator): this.type // just adds other's gradient to the current gradient
add方法定义了一种方法添加的单个数据点之后更新梯度,和合并的方法定义了一种方法,以两个分开的聚合结合起来。这个类被运送给执行者,用于聚合每个数据分区,然后用于将所有分区聚合器合并为一个聚合器。最终的聚合器实例保存当前迭代的累积梯度,并用于更新驱动器节点上的系数。这个过程是通过调用在LogisticCostFun
类控制treeAggregate
:
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
instances.treeAggregate(
new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
)(seqOp, combOp)
}
可以多一点简单地认为它是这样的:微风实现了几个不同的优化方法(例如LBFGS,OWLQN),只需要你告诉优化方法如何计算梯度。 Spark告诉Breeze算法如何通过LogisticCostFun
类来计算梯度。 LogisticCostFun
只是说要发送一个LogisticAggregator
实例到每个分区,收集渐变更新,然后将它们发回并合并到驱动程序中。
链接1:https://github.com/apache/spark/blob/v1.6.0/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala – DLSpark
链接2:HTTPS:/ /github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala – DLSpark