2017-02-16 51 views
0

我已经将数据设置,与aggregateByKey PySpark平方和的

a = sc.parallelize([((1,2),(10,20,[1,3])),((1,2),(30,40,[1]))])

和我需要:

  • (1,2)将是关键
  • 由于我想计算前两个值的流标准偏差,因此我需要为这些值中的每个值计算纯和和平方和。换句话说,我需要对第一个值 和 sumx =(10 + 30),sumx^2 =(10^2 + 30^2)sumx =(20 + 40),sumx^2 =( 20^2 + 40^2)为第二个值。
  • 为最终值(列表),我只是想连接它们。

的最终结果必须是:

([(1,2),(40,1000,60,2000,[1,3])])

这里是我的代码:

a.aggregateByKey((0.0,0.0,0.0,0.0,[]),\ (lambda x,y: (x[0]+y[0],x[0]*x[0]+y[0]*y[0],x[1]+y[1],x[1]*x[1]+y[1]*y[1],x[2]+y[2])),\ (lambda rdd1,rdd2: (rdd1[0]+rdd2[0],rdd1[1]+rdd2[1],rdd1[2]+rdd1[2],rdd1[3]+rdd2[3],rdd1[4]+rdd2[4]))).collect()

不幸的是,返回以下错误: "TypeError: unsupported operand type(s) for +: 'float' and 'list'"

任何汝亮灯?

回答

0

您可以使用hivecontext来解决这个问题:

from pyspark.sql.context import HiveContext 
hivectx = HiveContext(sc) 

a = sc.parallelize([((1,2),(10,20,[1,3])),((1,2),(30,40,[1]))]) 

# Convert this to a dataframe 
b = a.toDF(['col1','col2']) 

# Explode col2 into individual columns 
c = b.map(lambda x: (x.col1,x.col2[0],x.col2[1],x.col2[2])).toDF(['col1','col21','col22','col23']) 

c.registerTempTable('mydf') 

sql = """ 
select col1, 
sum(col21) as sumcol21, 
sum(POW(col21,2)) as sum2col21, 
sum(col22) as sumcol22, 
sum(POW(col22,2)) as sum2col22, 
collect_set(col23) as col23 
from mydf 
group by col1 
""" 
d = hivectx.sql(sql) 

# Get back your original dataframe 
e = d.map(lambda x:(x.col1,(x.sumcol21,x.sum2col21,x.sumcol22,x.sum2col22,[item for sublist in x.col23 for item in sublist]))).toDF(['col1','col2'])