2017-07-23 55 views
0

我创建了下面的rdd,我需要在同一个数据集上执行一系列过滤器来派生不同的计数器和聚合。一次评估多个过滤器

有没有一种方法可以应用这些过滤器并计算一次合计,避免火花经过同一个数据集多次?

val res = df.rdd.map(row => { 
    // ............... Generate data here for each row....... 
}) 

res.persist(StorageLevel.MEMORY_AND_DISK) 
val all = res.count() 

val stats1 = res.filter(row => row.getInt(1) > 0) 
val stats1Count = stats1.count() 
val stats1Agg = stats1.map(r => r.getInt(1)).mean() 

val stats2 = res.filter(row => row.getInt(2) > 0) 
val stats2Count = stats2.count() 
val stats2Agg = stats2.map(r => r.getInt(2)).mean() 
+0

为什么要转换为RDD?如果您将其作为数据框保留,则火花优化器将为您处理它。 –

回答

0

您可以使用集合:

case class Stats(count: Int = 0, sum: Int = 0) { 
    def mean = sum/count 
    def +(s: Stats): Stats = Stats(count + s.count, sum + s.sum) 
    def <- (n: Int) = if(n > 0) copy(count + 1, sum + n) else this 
} 

val (stats1, stats2) = res.aggregate(Stats() -> Stats()) (
    { (s, row) => (s._1 <- row.getInt(1), s._2 <- row.getInt(2)) }, 
    { _ + _ } 
) 

val (stat1Count, stats1Agg, stats2Count, stats2Agg) = (stats1.count, stats1.mean, stats2.count, stats2.mean)