2015-12-03 19 views
1

我有一个RDD,我想在RDD的列中的某一列中找到standard deviation。我当前的代码是:如何在PySpark中的RDD中找到列中的标准偏差

def extract(line): 
    # line[11] is the column in which I want to find standard deviation 
    return (line[1],line[2],line[5],line[6],line[8],line[10],line[11]) 

inputfile1 = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line) 


data = (inputfile1 
    .map(lambda line: line.split(";")) 
    .filter(lambda line: len(line) >1) 
    .map(extract)) # Map to tuples 

dataRDD在我的最后一列(列6)具有其中我想找到standard deviation值。我怎么找到它?

UPDATE:我当前的代码:

def extract(line): 
    # last column is numeric but in string format 
    return ((float(line[-1]))) 


input = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line) 

Data = (input 
    .map(lambda line: line.split(";")) 
    .filter(lambda line: len(line) >1) 
    .map(extract)) # Map to tuples 

row = Row("val") 
df = Data.map(row).toDF() 
df.map(lambda r: r.x).stdev() 

当我运行此我得到的错误是:在df.map(lambda r: r.x).stdev()AttributeError: x注:一些在我的数据值是负

回答

5

火花< 1.6.0你有几个选择:

  • 转换为RDD并使用stdev方法:

    from pyspark.sql import Row 
    import numpy as np 
    
    row = Row("x") 
    
    df = sc.parallelize([row(float(x)) for x in np.random.randn(100)]).toDF() 
    df.rdd.map(lambda r: r.x).stdev() 
    
  • 使用以下公式(here is a Scala版本):

    enter image description here

    from pyspark.sql.functions import avg, pow, col, sqrt, lit 
    
    sd = sqrt(
        avg(col("x") * col("x")) - pow(avg(col("x")), lit(2))).alias("stdev") 
    
    df.select(sd) 
    
  • 蜂巢UDF:

    df.registerTempTable("df") 
    
    sqlContext.sql("SELECT stddev(x) AS sd FROM df") 
    

火花1.6.0介绍stddevstddev_sampstddev_pop功能。