2016-09-01 20 views
1

我想计算不同时间步骤的累计计数。我对每个时间段发生的事件进行了计数:t:现在我想要包括该时间段在内的累计事件数。计算没有硬编码的't`时间段的累计计数

我可以很容易地分别计算每个累积,但它很乏味。我可以将它们与UnionAll一起附加回去,但这也很乏味,并且有很多时间段。

我怎样才能更干净地做到这一点?

package main.scala 

import java.io.File 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions._ 

object Test { 

    def main(args: Array[String]) { 

     // Spark and SQL Context (gives access to Spark and Spark SQL libraries) 
     val conf = new SparkConf().setAppName("Merger") 
     val sc = new SparkContext(conf) 
     val sqlContext = SQLContextSingleton.getInstance(sc) 
     import sqlContext.implicits._ 

     // Count 
     val count = Seq(("A",1,1),("A",1,2),("A",0,3),("A",0,4),("A",0,5),("A",1,6), 
         ("B",1,1),("B",0,2),("B",0,3),("B",1,4),("B",0,5),("B",1,6)) 
      .toDF("id","count","t") 

     val count2 = count.filter('t <= 2).groupBy('id).agg(sum("count"), max("t")) 

     val count3 = count.filter('t <= 3).groupBy('id).agg(sum("count"), max("t")) 

     count.show() 
     count2.show() 
     count3.show() 
    } 
} 

count

+---+-----+---+ 
| id|count| t| 
+---+-----+---+ 
| A| 1| 1| 
| A| 1| 2| 
| A| 0| 3| 
| A| 0| 4| 
| A| 0| 5| 
| A| 1| 6| 
| B| 1| 1| 
| B| 0| 2| 
| B| 0| 3| 
| B| 1| 4| 
| B| 0| 5| 
| B| 1| 6| 
+---+-----+---+ 

count2

+---+----------+------+ 
| id|sum(count)|max(t)| 
+---+----------+------+ 
| A|   2|  2| 
| B|   1|  2| 
+---+----------+------+ 

count3

+---+----------+------+ 
| id|sum(count)|max(t)| 
+---+----------+------+ 
| A|   2|  3| 
| B|   1|  3| 
+---+----------+------+ 

回答

0

我与火花1.5.2 /斯卡拉10和火花测试它2.0.0/Scala 11,它像一个魅力。它不适用于Spark 1.6.2,我怀疑这是因为它没有用Hive编译。

package main.scala 

import java.io.File 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SQLContext 


object Test { 

    def main(args: Array[String]) { 

     val conf = new SparkConf().setAppName("Test") 
     val sc = new SparkContext(conf) 
     val sqlContext = SQLContextSingleton.getInstance(sc) 
     import sqlContext.implicits._ 

     val data = Seq(("A",1,1,1),("A",3,1,3),("A",0,0,2),("A",4,0,4),("A",0,0,6),("A",2,1,5), 
         ("B",0,1,3),("B",0,0,4),("B",2,0,1),("B",2,1,2),("B",0,0,6),("B",1,1,5)) 
      .toDF("id","param1","param2","t") 
     data.show() 

     data.withColumn("cumulativeSum1", sum("param1").over(Window.partitionBy("id").orderBy("t"))) 
      .withColumn("cumulativeSum2", sum("param2").over(Window.partitionBy("id").orderBy("t"))) 
      .show() 
    } 
} 

我工作的一个改进是能够立刻将它应用到几列,而不是重复withColumn。投入欢迎!

0

我建议的方式去正常化数据你可以一步完成累积。 这段代码也应该很好地扩展(因为驱动程序只有一个集合)。

对不起,在我的例子不使用数据帧API(我的火花安装稍微borked所以我无法测试Dataframes):

val count = sc.makeRDD(Seq(("A",1,1),("A",1,2),("A",0,3),("A",0,4),("A",0,5),("A",1,6), 
    ("B",1,1),("B",0,2),("B",0,3),("B",1,4),("B",0,5),("B",1,6))) 

// this is required only if number of timesteps is not known, this is the only operation that collects data to driver, and could even be broadcasted if large 
val distinctTimesteps = count.map(_._3).distinct().sortBy(e => e, true).collect() 

// this actually de-normalizes data so that it can be cumulated 
val deNormalizedData = count.flatMap { case (id, c, t) => 
    // the trick is making composite key consisting of distinct timestep and your id: (distTimestep, id) 
    distinctTimesteps.filter(distTimestep => distTimestep >= t).map(distTimestep => (distTimestep, id) -> c) 
} 

// just reduce by composite key and you are done 
val cumulativeCounts = deNormalizedData.reduceByKey(_ + _) 

// test 
cumulativeCounts.collect().foreach(print)