2016-11-26 38 views
2

我试图减少时间序列数据,以收集结果在1小时内发生的成阵列(用于检测最大值,最小值,平均值)。火花reduceByKey只在某些条件下降低

它看起来并不我能够提供减少块会确定是否减少应该发生(添加到阵列中的值)中的条件,或减少跳过。

//data 
//ID, VAL, DATETIME 
tvFile.map((x) => 
      (x.split(',')(0), (Array(x.split(',')(1)), Array(x.split(',')(2))))) //(ID, ([VAL], [DATETIME]) 
     .reduceByKey((a,b) => { 
       val dt1 = DateTime.parse(a._2(0)) 
       val dt2 = DateTime.parse(b._2(0)) 
       if ((dt1.getDayOfYear == dt2.getDayOfYear) && (dt1.getHourOfDay == dt2.getHourOfDay)) 
        (a._1 ++ b._1, a._2 ++ b._2) 
       else 
        // NOT SURE WHAT TO DO HERE 
       }).collect 

上面是不是最有效/正确/我开始与Spark/Scala。

回答

2

的做法应该是为了有一个用于聚合将数据分区的关键准备数据。继在问题的代码,在这种情况下,关键应该是(id, day-of-year, hr-of-day)

一旦数据被正确处理,聚合是微不足道的。

例子:

val sampleData = Seq("p1,38.1,2016-11-26T11:15:10", 
         "p1,39.1,2016-11-26T11:16:10", 
         "p1,35.8,2016-11-26T11:17:10", 
         "p1,34.1,2016-11-26T11:18:10", 
         "p2,37.2,2016-11-26T11:16:00", 
         "p2,31.2,2016-11-27T11:17:00", 
         "p2,31.6,2016-11-27T11:17:00", 
         "p1,39.4,2016-11-26T12:15:10", 
         "p2,36.3,2016-11-27T10:10:10", 
         "p1,39.5,2016-11-27T12:15:00", 
         "p3,36.1,2016-11-26T11:15:10") 

val sampleDataRdd = sparkContext.parallelize(sampleData)       

val records = sampleDataRdd.map{line => 
          val parts = line.split(",") 
          val id = parts(0) 
          val value = parts(1).toDouble 
          val dateTime = DateTime.parse(parts(2)) 
          val doy = dateTime.getDayOfYear 
          val hod = dateTime.getHourOfDay 
          ((id, doy, hod), value) 
          } 

val aggregatedRecords = records.reduceByKey(_ + _)        
aggregatedRecords.collect 
// Array[((String, Int, Int), Double)] = Array(((p1,331,11),147.10000000000002), ((p2,332,11),62.8), ((p2,331,11),37.2), ((p1,332,12),39.5), ((p2,332,10),36.3), ((p1,331,12),39.4), ((p3,331,11),36.1)) 

这也更容易与Spark DataFrames很多。使用RDD API回答问题的方式如何。

+0

@tamersalama看到它也可作为笔记本:https://gist.github.com/maasg/e470654d15a73a1cc1a280e37561a8a5 – maasg