2017-09-15 92 views
0

计算平均我有两个dataframes: 第一数据帧classRecord有10个不同的条目像以下:分裂火花数据帧和基于一个列值

Class, Calculation 
first, Average 
Second, Sum 
Third, Average 

第二数据帧studentRecord具有围绕50K条目像以下:

Name, height, Camp, Class 
Shae, 152, yellow, first 
Joe, 140, yellow, first 
Mike, 149, white, first 
Anne, 142, red, first 
Tim, 154, red, Second 
Jake, 153, white, Second 
Sherley, 153, white, Second 

从第二个数据框中,根据类的类型,我想分别基于阵营进行高度计算(对于第一类:平均值,第二类:总和等)(如果类是fir st,平均黄色,白色等)。 我尝试以下操作:

//function to calculate average 
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = { 
    val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd 
    var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect 
    return avg_by_key 
} 

//required schema for further modifications 
val schema = StructType(
StructField("name", StringType, false) :: 
StructField("avg", DoubleType, false) :: Nil) 

// for each loop on each class type 
classRecord.rdd.foreach{ 
    //filter students based on camps 
    var campYellow =studentRecord.filter($"Camp" === "yellow") 
    var campWhite =studentRecord.filter($"Camp" === "white") 
    var campRed =studentRecord.filter($"Camp" === "red") 

    // since I know that calculation for first class is average, so representing calculation only for class first 
    val avgcampYellow = averageOnName(campYellow) 
    val avgcampWhite = averageOnName(campWhite) 
    val avgcampRed = averageOnName(campRed) 

    // union of all 
    val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfYellow = sqlContext.createDataFrame(rddYellow, schema) 
    //union with yellow camp data 
    val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfWhite = sqlContext.createDataFrame(rddWhite, schema) 
    var dfYellWhite = dfYellow.union(dfWhite) 
    //union with yellow,white camp data 
    val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfRed = sqlContext.createDataFrame(rddRed, schema) 
    var dfYellWhiteRed = dfYellWhite .union(dfRed) 
    // other modifications and final result to hive 
} 

在这里,我挣扎:

1.hardcoding Yellow, red and white, there may be other camp type also. 
2. Filtering same dataframe many times 
3. Not able to figure out how to calculate differently according to class calculation type. 

帮助表示赞赏。谢谢。

+0

如果我理解正确,您希望高度的平均值或总和取决于Camp和Class?怎么样计算所有camp/class的组合,把它放在一个数据框中,然后分别读取'classRecord'df? – Shaido

回答

0

您可以简单地对Class/Camp的所有组合进行平均和求和计算,然后分别解析classRecord数据帧并提取您需要的数据。您可以通过使用groupBy()方法轻松完成此操作并汇总值。

使用您的示例数据帧:

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

studentRecord.show() 

+-------+------+------+------+ 
| Name|height| Camp| Class| 
+-------+------+------+------+ 
| Shae| 152|yellow| first| 
| Joe| 140|yellow| first| 
| Mike| 149| white| first| 
| Anne| 142| red| first| 
| Tim| 154| red|Second| 
| Jake| 153| white|Second| 
|Sherley| 153| white|Second| 
+-------+------+------+------+ 

val df = studentRecord.groupBy("Class", "Camp").agg(
    sum($"height").as("Sum"), 
    avg($"height").as("Average"), 
    collect_list($"Name").as("Names")) 
df.show() 

+------+------+---+-------+---------------+ 
| Class| Camp|Sum|Average|   Names| 
+------+------+---+-------+---------------+ 
| first| white|149| 149.0|   [Mike]| 
| first| red|142| 142.0|   [Anne]| 
|Second| red|154| 154.0|   [Tim]| 
|Second| white|306| 153.0|[Jake, Sherley]| 
| first|yellow|292| 146.0| [Shae, Joe]| 
+------+------+---+-------+---------------+ 

这样做后,你可以简单地检查你的第一classRecord数据帧之后,行你需要的。它可以看起来像什么样的例子,可以根据您的实际需求进行更改:

// Collects the dataframe as an Array[(String, String)] 
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)} 

for (classRec <- classRecs){ 
    val clas = classRec._1 
    val calc = classRec._2 

    // Matches which calculation you want to do 
    val df2 = calc match { 
    case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average") 
    case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum") 
    } 

// Do something with df2 
} 

希望它有帮助!

+0

部分类似这样的事情,我还需要所有属于案例的名字,比如“Class,camp,names,average”。即使我得到最终的DF。我将如何决定,首先我需要选择平均(放弃总和),第二我需要总和(丢弃平均)等等。 – Swati

+0

我也试过上面的解决方案,它显示错误:值groupby不是org.apache.spark.rdd.RDD [String]的成员。谢谢。 – Swati

+0

@Swati对不起,它应该是'groupBy()'大写字母'B'。添加了名称列表以及解决方案。 – Shaido