2015-11-24 58 views
12

我有两列的数据框类型IntVec类型Vectororg.apache.spark.mllib.linalg.Vector)。如何定义自定义聚合函数来合计一列向量?

数据框看起来像如下:

ID,Vec 
1,[0,0,5] 
1,[4,0,1] 
1,[1,2,1] 
2,[7,5,0] 
2,[3,3,4] 
3,[0,8,1] 
3,[0,0,1] 
3,[7,7,7] 
.... 

我愿做一个groupBy($"ID")然后由矢量求和每个组内的行申请的集合。

上面的例子中所述的期望的输出将是:

ID,SumOfVectors 
1,[5,2,7] 
2,[10,8,4] 
3,[7,15,9] 
... 

可用聚合功能将不起作用,例如df.groupBy($"ID").agg(sum($"Vec")将导致ClassCastException。

如何实现一个自定义的聚合函数,它允许我执行矢量或数组或任何其他自定义操作的总和?

+3

[如何在Spark SQL中定义和使用用户定义的聚合函数?](http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user -defined-aggregate-function-in-spark-sql) –

回答

19

我个人不会打扰UDAFs。不仅仅是冗长而且不完全快速。相反,我会简单地使用reduceByKey/foldByKey

import org.apache.spark.sql.Row 
import breeze.linalg.{DenseVector => BDV} 
import org.apache.spark.ml.linalg.{Vector, Vectors} 

val rdd = sc.parallelize(Seq(
    (1, "[0,0,5]"), (1, "[4,0,1]"), (1, "[1,2,1]"), 
    (2, "[7,5,0]"), (2, "[3,3,4]"), (3, "[0,8,1]"), 
    (3, "[0,0,1]"), (3, "[7,7,7]"))) 

val df = rdd.map{case (k, v) => (k, Vectors.parse(v))}.toDF("id", "vec") 

val aggregated = df 
    .rdd 
    .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } 
    .foldByKey(BDV(Array.fill(3)(0.0)))(_ += _) 
    .mapValues(v => Vectors.dense(v.toArray)) 
    .toDF("id", "vec") 

aggregated.show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

和公正的比较 “简单” 的UDAF。所需进口:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
    UserDefinedAggregateFunction} 
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} 
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} 
import org.apache.spark.sql.Row 
import scala.collection.mutable.WrappedArray 

类定义:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) 
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) 
    def dataType = SQLDataTypes.VectorType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, Array.fill(n)(0.0)) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) { 
     val buff = buffer.getAs[WrappedArray[Double]](0) 
     val v = input.getAs[Vector](0).toSparse 
     for (i <- v.indices) { 
      buff(i) += v(i) 
     } 
     buffer.update(0, buff) 
     } 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
     val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
     for ((x, i) <- buff2.zipWithIndex) { 
     buff1(i) += x 
     } 
     buffer1.update(0, buff1) 
    } 

    def evaluate(buffer: Row) = Vectors.dense(
     buffer.getAs[Seq[Double]](0).toArray) 
} 

和示例用法:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

参见:How to find mean of grouped Vector columns in Spark SQL?

+0

我看到诀窍是使用breeze.linalg.DensVector,为什么它工作并且mllib.linalg的密集向量不是? – Rami

+1

问题是''mllib.linalg.Vector'的Scala版本没有'+'方法。 – zero323

+0

这不能用DF或SQL完成吗? – oluies

0

我建议如下(在星火2.0.2以后的作品),它可能会进行优化,但它是非常好的,你要提前知道一件事是矢量的大小,当您创建UDAF实例

import org.apache.spark.ml.linalg._ 
import org.apache.spark.mllib.linalg.WeightedSparseVector 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class VectorAggregate(val numFeatures: Int) 
    extends UserDefinedAggregateFunction { 

private type B = Map[Int, Double] 

def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) 

def bufferSchema: StructType = 
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) 

def initialize(buffer: MutableAggregationBuffer): Unit = 
buffer.update(0, Map.empty[Int, Double]) 

def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val zero = buffer.getAs[B](0) 
    input match { 
     case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) 
     case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
val zero = buffer1.getAs[B](0) 
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} 

def deterministic: Boolean = true 

def evaluate(buffer: Row): Any = { 
    val Row(agg: B) = buffer 
    val indices = agg.keys.toArray.sorted 
    Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed 
} 

def dataType: DataType = new VectorUDT() 
}