计算平均我有两个dataframes: 第一数据帧classRecord
有10个不同的条目像以下:分裂火花数据帧和基于一个列值
Class, Calculation
first, Average
Second, Sum
Third, Average
第二数据帧studentRecord
具有围绕50K条目像以下:
Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second
从第二个数据框中,根据类的类型,我想分别基于阵营进行高度计算(对于第一类:平均值,第二类:总和等)(如果类是fir st,平均黄色,白色等)。 我尝试以下操作:
//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow)
val avgcampWhite = averageOnName(campWhite)
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
//union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}
在这里,我挣扎:
1.hardcoding Yellow, red and white, there may be other camp type also.
2. Filtering same dataframe many times
3. Not able to figure out how to calculate differently according to class calculation type.
帮助表示赞赏。谢谢。
如果我理解正确,您希望高度的平均值或总和取决于Camp和Class?怎么样计算所有camp/class的组合,把它放在一个数据框中,然后分别读取'classRecord'df? – Shaido