2017-07-11 53 views
1

我需要logic.I帮助范围有限阶有这样迭代与记录

tag,timestamp,listner,org,suborg,rssi        
4,101,1901,4,3,0.60                                          
4,110,1901,4,3,0.90 
4,104,1901,4,3,0.30 
4,109,1901,4,3,0.40 
4,111,1901,4,3,0.60               
4,128,1901,4,3,0.40 
4,129,1901,4,3,0.80 
4,131,1901,4,3,0.60                 
4,133,1901,4,3,0.30 
4,143,1901,4,3,0.60                 
4,147,1901,4,3,0.70 
4,148,1901,4,3,0.40 
4,149,1901,4,3,0.30 
4,150,1901,4,3,0.90 

我必须找到RSSI列的平均它们距离最近10秒时间表数据。

这是我的预期输出。

tagShortID,timestamp,listenerShortID,rootOrgID,subOrgID,rssi_Weight,rssi_Weight_avg 
4,150,1901,4,3,0.9,0.58 
4,149,1901,4,3,0.3,0.5 
4,148,1901,4,3,0.4,0.56 
4,147,1901,4,3,0.7,0.64 
4,143,1901,4,3,0.6,0.44 
4,133,1901,4,3,0.3,0.525 
4,131,1901,4,3,0.6,0.6 
4,129,1901,4,3,0.8,0.6 
4,128,1901,4,3,0.4,0.4 
4,111,1901,4,3,0.6,0.6 
4,110,1901,4,3,0.9,0.9 
4,109,1901,4,3,0.4,0.4 
4,104,1901,4,3,0.3,0.3 
4,101,1901,4,3,0.6,0.6 

我想这

df.withColumn("firstValue", first("Timestamp") over Window.orderBy($"Timestamp".desc).partitionBy("tagShortID", "ListenerShortID")) 
.filter($"firstValue".cast("long")-$"Timestamp".cast("long") <= 10) 
.withColumn("count", count("Timestamp") over Window.partitionBy("tagShortID", "ListenerShortID")) 
.withColumn("RSSI_Weight", when($"count" >= 10, avg($"RSSI_Weight") over Window.orderBy("Timestamp").partitionBy("tagShortID", "ListenerShortID").rowsBetween(Long.MinValue, 0)) otherwise($"RSSI_Weight")) 
.drop("firstValue", "count") 
.show(30, false) 

这上面会检查是否有高价值的时间戳,然后做 - 10秒。但我需要迭代每个时间戳并检查10秒。如果是,则avg else采用rssi值。

任何帮助将不胜感激。

回答

1

你可以用下面的逻辑在RDD得到数据框,你需要

def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = { 
    val currentTimeStamp = list(1).toLong 
    var sum = 0.0 
    var count = 0 
    var check = false 
    import scala.util.control.Breaks._ 
    breakable { 
    for (array <- buffer) { 
     val toCheckTimeStamp = array(1).toLong 
     if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { 
     sum += array(5).toDouble 
     count += 1 
     } 
     if ((currentTimeStamp - 10L) > toCheckTimeStamp) { 
     check = true 
     break 
     } 
    } 
    } 
    if (sum != 0.0 && check) list :+ sum/count 
    else list :+ list(5).toDouble 
} 

import sqlContext.implicits._ 
val averageDF = sc.textFile("path to your csv file") 
    .map(line => line.split(",").map(_.trim)) 
    .sortBy(array => array(1), false) 
    .groupBy(array => (array(0), array(2))) 
    .mapValues(buffer => { 
     buffer.map(list => { 
     avgCalc(buffer, list) 
     }) 
    }) 
    .flatMap(x => x._2) 
    .map(x => Jessi(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) 
    .toDF 

averageDF.show 

凡杰西是一个案例类

case class Jessi(tagShortID: String, Timestamp: Long, ListenerShortID: String, rootOrgID: String, subOrgID: String, RSSI_Weight: Double, RSSI_Weight_avg: Double) 

所以你应该有如下输出

+----------+---------+---------------+---------+--------+-----------+-------------------+ 
|tagShortID|Timestamp|ListenerShortID|rootOrgID|subOrgID|RSSI_Weight|RSSI_Weight_avg | 
+----------+---------+---------------+---------+--------+-----------+-------------------+ 
|4   |150  |1901   |4  |3  |0.9  |0.58    | 
|4   |149  |1901   |4  |3  |0.3  |0.5    | 
|4   |148  |1901   |4  |3  |0.4  |0.5666666666666668 | 
|4   |147  |1901   |4  |3  |0.7  |0.6499999999999999 | 
|4   |143  |1901   |4  |3  |0.6  |0.44999999999999996| 
|4   |133  |1901   |4  |3  |0.3  |0.525    | 
|4   |131  |1901   |4  |3  |0.6  |0.6    | 
|4   |129  |1901   |4  |3  |0.8  |0.6000000000000001 | 
|4   |128  |1901   |4  |3  |0.4  |0.4    | 
|4   |111  |1901   |4  |3  |0.6  |0.6    | 
|4   |110  |1901   |4  |3  |0.9  |0.9    | 
|4   |109  |1901   |4  |3  |0.4  |0.4    | 
|4   |104  |1901   |4  |3  |0.3  |0.3    | 
|4   |101  |1901   |4  |3  |0.6  |0.6    | 
+----------+---------+---------------+---------+--------+-----------+-------------------+ 
+0

大解决方案 –

+0

如果有帮助,请接受答案:) –