2015-11-18 45 views
1

我使用Spark MLLib k-Means,它要求功能具有相同的尺寸。这些特征是使用直方图计算的,所以我必须使用固定大小的箱子。 Hive具有内置函数histogram_numeric(col,b) - 使用b个非均匀间隔的bin计算组中数值列的直方图。什么是最好的方法以及如何在直方图中使用b个固定大小的容器?如何在Hive中的直方图中使用固定大小的大小?

回答

1

一个处理这种可能的方式是创建一个UDF这样

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions._ 

def get_bucket(breaks: Array[Double]) = udf(
    (x: Double) => 
    scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1)) 

让我们假设你的数据看起来与此类似:

val df = sc.parallelize(Seq(
    (1, 1.0), (1, 2.3), (1, 0.4), (1, 2.1), (1, 3.5), (1, 9.0), 
    (2, 3.6), (2, 0.2), (2, 0.6), (2, 0.1), (2, 4.0), (2, -1.0) 
)).toDF("k", "v") 

其中k标识点和v是你的值想用来计算直方图。

val breaks = Array(0.0, 1.0, 2.0, 3.0, 4.0) 

val dfWithBuckets = df 
    .withColumn("bucket", get_bucket(breaks)($"v")) 
    .groupBy($"k", $"bucket") 
    .agg(count(lit(1))) 

dfWithBuckets.show() 
// +---+------+--------+ 
// | k|bucket|count(1)| 
// +---+------+--------+ 
// | 1|  1|  1| 
// | 1|  2|  1| 
// | 1|  3|  2| 
// | 1|  4|  1| 
// | 1|  5|  1| 
// | 2|  0|  1| 
// | 2|  1|  3| 
// | 2|  4|  1| 
// | 2|  5|  1| 
// +---+------+--------+ 

最后上面的数据可以被收集,归纳,并转化为向量:

import org.apache.spark.mllib.linalg.Vectors 

def toVector(xs: Iterable[(Int, Long)], n: Int) = { 
    val sorted = xs.toArray.sorted 
    val indices = sorted.map(_._1) 
    val values = sorted.map(_._2.toDouble) 
    Vectors.sparse(n, indices, values) 
} 

val vectors = dfWithBuckets.map{ 
    case Row(k: Int, b: Int, cnt: Long) => 
    (k, (b, cnt))} 
    .groupByKey 
    .mapValues(vs => toVector(vs, breaks.size + 1)) 

vectors.collect 
// Array[(Int, org.apache.spark.mllib.linalg.Vector)] = Array(
// (1,(6,[1,2,3,4,5],[1.0,1.0,2.0,1.0,1.0])), 
// (2,(6,[0,1,4,5],[1.0,3.0,1.0,1.0]))) 
+0

感谢您的有益的建议!如果没有其他选择,我会使用你的方法。在我的情况下,我有:'myDfWithBuckets.groupBy($“bucket”)。agg(callUDF(“histogram_numeric”,$“col1”,lit(n)))''。如果n是固定大小的容器,那么我就完成了。 histogram_numeric的问题是,在n = 40的情况下,有时我只回收12个桶,有时20个桶。我需要固定数量的桶。 – wdz