2015-03-02 49 views

回答

1

将RDD转换为Double的RDD,然后使用.histogram(10)操作。见DoubleRDD ScalaDoc

+3

.histogram(bucketCount)发现了这个蜂巢UDAF不算百分点,这“计算的直方图使用bucketCount桶的数量*在RDD的最小值和最大值之间均匀分布“ – Dmitry 2016-04-04 21:11:39

19

您可以:

  1. 排序通过rdd.sortBy()
  2. 计算通过rdd.count的数据集的大小()
  3. 邮编索引,以方便检索百分的
  4. 通过rdd.lookup()获取想要的百分位数例如为第10百分位rdd.lookup(0.1 *大小)

为了计算中位数和第99百分位数: getPercentiles(RDD,新的双[] {0.5,0.99},大小,numPartitions);

在Java 8:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) { 
    double[] values = new double[percentiles.length]; 

    JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions); 
    JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap()); 

    for (int i = 0; i < percentiles.length; i++) { 
     double percentile = percentiles[i]; 
     long id = (long) (rddSize * percentile); 
     values[i] = indexed.lookup(id).get(0); 
    } 

    return values; 
} 

注意,这需要排序数据集,O(n.log(N)),并且可以是对大数据集昂贵。

另一个建议简单计算直方图的答案将无法正确计算百分比:这里是一个反例:由100个数字组成的数据集,99个数字为0,一个数字为1。最终得到所有99 0在第一个垃圾箱中,最后一个垃圾箱中的1,中间有8个空垃圾箱。

0

另一种替代方法是使用double的RDD上的顶部和最后一个。例如,val percentile_99th_value = scores.top((count/100).toInt).last

此方法更适合个别百分位数。

3

我发现了这个要点

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

,包含以下功能:

/** 
    * compute percentile from an unsorted Spark RDD 
    * @param data: input data set of Long integers 
    * @param tile: percentile to compute (eg. 85 percentile) 
    * @return value of input data at the specified percentile 
    */ 
    def computePercentile(data: RDD[Long], tile: Double): Double = { 
    // NIST method; data to be sorted in ascending order 
    val r = data.sortBy(x => x) 
    val c = r.count() 
    if (c == 1) r.first() 
    else { 
     val n = (tile/100d) * (c + 1d) 
     val k = math.floor(n).toLong 
     val d = n - k 
     if (k <= 0) r.first() 
     else { 
     val index = r.zipWithIndex().map(_.swap) 
     val last = c 
     if (k >= c) { 
      index.lookup(last - 1).head 
     } else { 
      index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head) 
     } 
     } 
    } 
    } 
3

这是我在星火Python实现用于计算包含感兴趣的值的RDD百分。

def percentile_threshold(ardd, percentile): 
    assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100" 

    return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \ 
      .lookup(np.ceil(ardd.count()/100 * percentile - 1))[0] 

# Now test it out 
import numpy as np 
randlist = range(1,10001) 
np.random.shuffle(randlist) 
ardd = sc.parallelize(randlist) 

print percentile_threshold(ardd,0.001) 
print percentile_threshold(ardd,1) 
print percentile_threshold(ardd,60.11) 
print percentile_threshold(ardd,99) 
print percentile_threshold(ardd,99.999) 
print percentile_threshold(ardd,100) 

# output: 
# 1 
# 100 
# 6011 
# 9900 
# 10000 
# 10000 

另外,我定义了以下函数以获得第10到第100百分位数。

def get_percentiles(rdd, stepsize=10): 
    percentiles = [] 
    rddcount100 = rdd.count()/100 
    sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) 


    for p in range(0, 101, stepsize): 
     if p == 0: 
      pass 
      # I am not aware of a formal definition of 0 percentile, 
      # you can put a place holder like this if you want 
      # percentiles.append(sortedrdd.lookup(0)[0] - 1) 
     elif p == 100: 
      percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0]) 
     else: 
      pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0] 
      percentiles.append(pv) 

    return percentiles 

randlist = range(1,10001) 
np.random.shuffle(randlist) 
ardd = sc.parallelize(randlist) 
get_percentiles(ardd, 10) 

# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000] 
+0

不应该在'get_percentiles'的'sortedrdd'定义中将'ardd'替换为'rdd'?以及添加'导入numpy为np'。 IOT似乎不适用于'numpy 1.11.3' – Jorge 2017-08-24 07:27:00

5

如何T-消化

https://github.com/tdunning/t-digest

准确线上累积基于秩统计的新的数据结构,如分位数和修剪装置。 t-digest算法也非常平行,使其在地图缩减和并行流应用程序中非常有用。

t消化构造算法使用一维k均值聚类的变体来生成与Q摘要相关的数据结构。这个t消化数据结构可用于估计分位数或计算其他等级统计。 t-digest优于Q-digest的优点在于t-digest可以处理浮点值,而Q-digest仅限于整数。只要稍作修改,t-digest就可以处理任何有序集合中的任何值,这些集合的含义与平均值相似。尽管t-digests存储在磁盘上时更加紧凑,但由t-digests产生的分位数估计的准确性可能比Q-digest所产生的分位数精确得多。

综上所述,T-消化的特别有趣的特点是,它

  • 比Q-消化对双打以及整数
  • 作品小总结。
  • 提供每百万精度极端位数和通常<为中间1000ppm的准确性部分分位数
  • 是快速
  • 非常简单
  • 具有具有> 90%的测试覆盖率的参考实现
  • 可以是所使用的map-reduce很容易,因为消化可以合并

它应该是相当容易使用的参考Java的implem来自Spark的诱惑。

+1

其实Erik Erlandson在这里有一个火花实现:https://github.com/isarn/isarn-sketches-spark。它效果很好。我发现唯一的解决方案就是不能将TDigest对象保存为parquet格式。只要你只是扔了大量的数据,并要求获得一些百分点的结果,那就是你正在寻找的东西:) – 2018-01-16 19:56:46

2

如果您不介意将您的RDD转换为DataFrame并使用Hive UDAF,则可以使用percentile。假设你装HiveContext hiveContext到范围:

hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")

this answer.