2016-08-03 57 views

回答

4

以下是Python中使用Dataframe API(Spark 1.6 +)的示例实现。

import pyspark.sql.functions as F 
import numpy as np 
from pyspark.sql.types import FloatType 

假设我们有“工资”火花数据帧,如为客户月薪:

月| customer_id |工资

,我们想找到贯穿所有月份

每个客户的平均年薪

第一步:写一个用户定义函数来计算平均

def find_median(values_list): 
    try: 
     median = np.median(values_list) #get the median of values in a list in each row 
     return round(float(median),2) 
    except Exception: 
     return None #if there is anything wrong with the given values 

median_finder = F.udf(find_median,FloatType()) 

第2步:在工资汇总列将它们收集到每行中的工资列表中:

salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries")) 

第3步:调用saraies col上的median_finder udf并添加中间值作为新列

salaries_list = salaries_list.withColumn("median",median_finder("salaries")) 
+1

使用np.nanmedian(values_list)忽略NaN并且有时是更好的选择 –

相关问题