2017-07-06 36 views
1

嗨,我完全新的火花scala.I需要一个想法或任何样本solution.I有这样集团通过与斯卡拉平均功能

tagid,timestamp,listner,orgid,suborgid,rssi 
[4,1496745915,718,4,3,0.30] 
[2,1496745915,3878,4,3,0.20] 
[4,1496745918,362,4,3,0.60] 
[4,1496745913,362,4,3,0.60] 
[2,1496745918,362,4,3,0.10] 
[3,1496745912,718,4,3,0.05] 
[2,1496745918,718,4,3,0.30] 
[4,1496745911,1901,4,3,0.60] 
[4,1496745912,718,4,3,0.60] 
[2,1496745915,362,4,3,0.30] 
[2,1496745912,3878,4,3,0.20] 
[2,1496745915,1901,4,3,0.30] 
[2,1496745910,1901,4,3,0.30] 

我要找到每个标签和数据每个列表器最后10秒钟的时间戳数据。然后对于10秒钟的数据,我需要找到rssi值的平均值。就像这样。

2,1496745918,718,4,3,0.60 
2,1496745917,718,4,3,1.30 
2,1496745916,718,4,1,2.20 
2,1496745914,718,1,2,3.10 
2,1496745911,718,1,2,6.10 
4,1496745910,1901,1,2,0.30 
4,1496745908,1901,1,2,1.30 
.......................... 
.......................... 

像这样我需要找到它。任何解决方案或建议表示赞赏。 注意:我正在使用spark scala。

我试过通过spark sql查询。但不能正常工作。

val filteravg = avg.registerTempTable("avg") 
val avgfinal = sqlContext.sql("SELECT tagid,timestamp,listner FROM (SELECT tagid,timestamp,listner,dense_rank() OVER (PARTITION BY _c6 ORDER BY _c5 ASC) as rank FROM avg) tmp WHERE rank <= 10") 
avgfinal.collect.foreach(println) 

我想通过数组also.Any帮助将不胜感激。

+0

你需要从最后10秒的数据工作当前时间或每个标签和每个列表器的最大值? –

+0

最近10秒数据来自每个标签和listner.From当前时间或只是最近的10条记录..两者都很好 –

+0

标签和listner最近10秒的平均值将是一个单一的值,而不是像您提到的多个值(0.60 ,1.30,2.20,...),不是吗? – vdep

回答

2

如果你已经有一个数据帧作为

+-----+----------+-------+-----+--------+----+ 
|tagid|timestamp |listner|orgid|suborgid|rssi| 
+-----+----------+-------+-----+--------+----+ 
|4 |1496745915|718 |4 |3  |0.30| 
|2 |1496745915|3878 |4 |3  |0.20| 
|4 |1496745918|362 |4 |3  |0.60| 
|4 |1496745913|362 |4 |3  |0.60| 
|2 |1496745918|362 |4 |3  |0.10| 
|3 |1496745912|718 |4 |3  |0.05| 
|2 |1496745918|718 |4 |3  |0.30| 
|4 |1496745911|1901 |4 |3  |0.60| 
|4 |1496745912|718 |4 |3  |0.60| 
|2 |1496745915|362 |4 |3  |0.30| 
|2 |1496745912|3878 |4 |3  |0.20| 
|2 |1496745915|1901 |4 |3  |0.30| 
|2 |1496745910|1901 |4 |3  |0.30| 
+-----+----------+-------+-----+--------+----+ 

执行以下操作应该为你

df.withColumn("firstValue", first("timestamp") over Window.orderBy($"timestamp".desc).partitionBy("tagid")) 
    .filter($"firstValue".cast("long")-$"timestamp".cast("long") < 10) 
    .withColumn("average", avg("rssi") over Window.partitionBy("tagid")) 
    .drop("firstValue") 
    .show(false) 

你应该有输出

+-----+----------+-------+-----+--------+----+-------------------+ 
|tagid|timestamp |listner|orgid|suborgid|rssi|average   | 
+-----+----------+-------+-----+--------+----+-------------------+ 
|3 |1496745912|718 |4 |3  |0.05|0.05    | 
|4 |1496745918|362 |4 |3  |0.60|0.54    | 
|4 |1496745915|718 |4 |3  |0.30|0.54    | 
|4 |1496745913|362 |4 |3  |0.60|0.54    | 
|4 |1496745912|718 |4 |3  |0.60|0.54    | 
|4 |1496745911|1901 |4 |3  |0.60|0.54    | 
|2 |1496745918|362 |4 |3  |0.10|0.24285714285714288| 
|2 |1496745918|718 |4 |3  |0.30|0.24285714285714288| 
|2 |1496745915|3878 |4 |3  |0.20|0.24285714285714288| 
|2 |1496745915|362 |4 |3  |0.30|0.24285714285714288| 
|2 |1496745915|1901 |4 |3  |0.30|0.24285714285714288| 
|2 |1496745912|3878 |4 |3  |0.20|0.24285714285714288| 
|2 |1496745910|1901 |4 |3  |0.30|0.24285714285714288| 
+-----+----------+-------+-----+--------+----+-------------------+ 
+0

这将显示每个标签和列表器的10秒时间戳数据?还有一件事...如果我在rssi中的记录少于10条,意味着我需要使用现有的rssi值。没有必要总结 –

+0

我忘了将时间戳更改为秒。但这个想法是一样的。让我看看我能做些什么。 –

+0

当然...感谢您的考虑! –