2

假设元组的RDD存在类似如下:Spark RDD:如何最有效地计算统计信息?

(key1, 1) 
(key3, 9) 
(key2, 3) 
(key1, 4) 
(key1, 5) 
(key3, 2) 
(key2, 7) 
... 

什么是计算对应于每个关键统计数据的最有效的(和,理想情况下,分布式)的方式? (此刻,我要找计算标准偏差/方差,尤其如此。)据我所知,我的选择权相当于:

  1. 使用colStats function in MLLib此方法具有易于适应优势如果认为有必要进行其他统计计算,则可以在以后使用其他mllib.stat函数。然而,它运行在包含每列数据的RDD Vector上,据我了解,这种方法需要在单个节点上收集每个密钥的全部值,这对于大型企业而言似乎并不理想数据集。 Spark Vector是否总是暗示Vector中的数据本地驻留在单个节点上?
  2. 执行groupByKey,然后stats可能洗牌重,as a result of the groupByKey operation
  3. 执行aggregateByKey,初始化新StatCounter,并使用StatCounter::merge的顺序和组合功能:这是方法recommended by this StackOverflow answer,避免从选项2.然而groupByKey,我一直没能找到好的文档PySpark中的StatCounter

我喜欢选项1,因为它使代码更可扩展的,因为它可以很容易地适应使用具有类似合同其他MLLib功能更复杂的计算,但如果Vector输入固有地要求该数据集在本地收集那么它会限制代码可以有效运行的数据大小。在另外两个之间,选项3 看起来效率更高,因为它避免了groupByKey,但我希望确认是这种情况。

有没有其他的选择我没有考虑过? (我目前使用Python + PySpark,但如果语言有差异,我也可以使用Java/Scala解决方案。)

+0

可能重复[在单个数据中发现带有pyspark的最小/最大值](http://stackoverflow.com/questions/36559809/finding-min-max-with-pyspark-in-single-pass-over -数据) –

回答

2

您可以试试reduceByKey。这是很简单的,如果我们只是想计算min()

rdd.reduceByKey(lambda x,y: min(x,y)).collect() 
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)] 

要计算mean,您首先需要创建(value, 1)元组,我们使用的reduceByKey操作同时计算sumcount。最后我们把它们通过彼此在mean到达:

meanRDD = (rdd 
      .mapValues(lambda x: (x, 1)) 
      .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) 
      .mapValues(lambda x: x[0]/x[1])) 

meanRDD.collect() 
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)] 

对于variance,您可以用公式(sumOfSquares/count) - (sum/count)^2, 我们通过以下方式转换:

varRDD = (rdd 
      .mapValues(lambda x: (1, x, x*x)) 
      .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2])) 
      .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2))) 

varRDD.collect() 
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)] 

我使用的值类型double而不是int中的虚拟数据准确地说明计算的平均值和方差:

rdd = sc.parallelize([("key1", 1.0), 
         ("key3", 9.0), 
         ("key2", 3.0), 
         ("key1", 4.0), 
         ("key1", 5.0), 
         ("key3", 2.0), 
         ("key2", 7.0)])