我是新手火花斯卡拉,我为问愚蠢的问题(如果是)而道歉。我被困在一个问题,我简化如下:星火斯卡拉 - 如何计数()通过两行空调

有一个数据框有三列,“machineID”是机器的身份。 “startTime”是任务的开始时间戳。 “endTime”是任务的结束时间戳。


machineID, startTime, endTime 
1, 0, 3 
1, 4, 8 
1, 10, 20 
1, 20, 31 
1, 412, 578 
2, 231, 311 
2, 781, 790 

我正在使用spark 2.0.1和scala 2.11.8



要访问DataFrame中的上一行/下一行,我们可以使用Window函数。 在这种情况下,我们将使用lag来访问上一个结束时间,按machineId分组。

import org.apache.spark.sql.expressions.Window 

// Dataframe Schema 
case class MachineData(id:String, start:Int, end:Int) 
// Sample Data 
| id|start|end| 
| 1| 0| 3| 
| 1| 4| 8| 
| 1| 10| 20| 
| 1| 20| 31| 
| 1| 412|578| 
| 2| 231|311| 
| 2| 781|790| 

// define the window as a partition over machineId, ordered by start (time) 
val byMachine = Window.partitionBy($"id").orderBy($"start") 
// we define a new column, "previous end" using the Lag Window function over the previously defined window 
val prevEnd = lag($"end", 1).over(byMachine) 

// new DF with the prevEnd column 
val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd) 

| id|start|end|prevEnd| 
| 1| 0| 3| null| 
| 1| 4| 8|  3| 
| 1| 10| 20|  8| 
| 1| 20| 31|  20| 
| 1| 412|578|  31| 
| 2| 231|311| null| 
| 2| 781|790| 311| 

// we're calculating the idle intervals as the numerical diff as an example 
val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd") 

| id|start|end|prevEnd|diff| 
| 1| 0| 3| null|null| 
| 1| 4| 8|  3| 1| 
| 1| 10| 20|  8| 2| 
| 1| 20| 31|  20| 0| 
| 1| 412|578|  31| 381| 
| 2| 231|311| null|null| 
| 2| 781|790| 311| 470| 

// to calculate the total, we are summing over the differences. Adapt this as your business logic requires. 
val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff")) 

| id|sum(diff)| 
| 1|  384| 
| 2|  470| 

我今天学到了一些新东西,窗口函数.. +1 – Shankar


今天学习了lag()。解释非常明确和有用。谢谢你maasg! –