2017-08-23 129 views
0

我的数据集是这样的:(前三列输入,我添加的列4-6,最后一列代表所需的输出)计算运行时间

+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 
|   timestamp|status| msg|  end_timestamp|   start_eng|   stop_eng| --|res | 
+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 
|2017-01-01 06:15:00| ASC_a|nice|2017-01-01 07:00:00|    null|    null|-->| 0 | 
|2017-01-01 07:00:00| ASC_a|nice|2017-01-01 07:15:00|    null|    null|-->| 0 | 
|2017-01-01 07:15:00| start|nice|2017-01-01 08:00:00|2017-01-01 07:15:00|    null|-->| 45 | 
|2017-01-01 08:00:00| start|nice|2017-01-01 08:22:00|2017-01-01 08:00:00|    null|-->| 22 | 
|2017-01-01 08:22:00| ASC_b|init|2017-01-01 09:00:00|    null|    null|-->| 38 | 
|2017-01-01 09:00:00| ASC_b|init|2017-01-01 09:30:00|    null|    null|-->| 30 | 
|2017-01-01 09:30:00| end| bla|2017-01-01 10:00:00|    null|2017-01-01 09:30:00|-->| 0 | 
|2017-01-01 10:00:00| end| bla|2017-01-01 10:45:00|    null|2017-01-01 10:00:00|-->| 0 | 
|2017-01-01 10:45:00| ASC_a|meas|2017-01-01 11:00:00|    null|    null|-->| 0 | 
|2017-01-01 11:00:00| ASC_a|meas|2017-01-01 12:00:00|    null|    null|-->| 0 | 
|2017-01-01 12:00:00| ASC_a|meas|2017-01-01 12:15:00|    null|    null|-->| 0 | 
|2017-01-01 12:15:00| start|meas|2017-01-01 13:00:00|2017-01-01 12:15:00|    null|-->| 45 | 
|2017-01-01 13:00:00| start|meas|2017-01-01 13:22:00|2017-01-01 13:00:00|    null|-->| 22 | 
|2017-01-01 13:22:00| ASC_c|init|2017-01-01 14:00:00|    null|    null|-->| 38 | 
|2017-01-01 14:00:00| ASC_c|init|2017-01-01 14:31:00|    null|    null|-->| 31 | 
|2017-01-01 14:31:00| end|meas|    null|    null|2017-01-01 14:31:00|-->| 0 | 
+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 

我要计算的从状态启动的第一次发生到状态结束的第一次发生时的引擎运行时间。 (状态的开始和结束都出现在随后的列中,因为我添加了具有爆炸功能的行,但我仍然必须稍后将它们更改为合理的值)

问题是我不知道如何计算引擎运行时开始和结束之间既不包含开始也不包含结束的行。
我想过使用窗口函数进行计算,但我不知道如何为此指定窗口。

+0

你还可以分享到目前为止尝试过的代码吗? –

+0

spark rdd是完全分布式的问题,你可以在不同的分区中找到'start',行和'end'之间的所有行,你不能在开始和结束之间有一个特定的键字段? – Mehrez

+0

如果我找到一种方法来摆脱重复的开始结束条目,并填写他们的状态与最后的ASC状态,我想我可以做几乎完全一样的方式,我做到了这里https://stackoverflow.com/questions/ 45815464 /正确的方法对填充数据集,以数据为基础的上窗口。 – user2811630

回答

0

我终于得到它为小数据集工作。仍然必须对它进行测试。

//get tempstat column 
    val ds3 = ds2.withColumn("tempstat", when($"status".contains("ASC"), $"status").otherwise(null)) 
     .withColumn("tempstat_final", last($"tempstat", true).over(window)) 

    //remove duplicate status 
    val ds5 = ds3.withColumn("new_status", when(!$"status".contains("ASC") && lag($"status", 1).over(window) =!= $"status", $"status").otherwise($"tempstat_final")) 

    //get column that provides window for calculation 
    val ds6 = ds5.withColumn("startFlag", when($"new_status" === "start", 1).otherwise(0)) 
     .withColumn("stopFlag", when($"new_status" === "end", -1).otherwise(0)) 
     .withColumn("bothFlags", $"startFlag" + $"stopFlag") 
     .withColumn("engineFlag", sum($"bothFlags").over(Window.orderBy("timestamp"))) 

    //calculate runtime 
    val ds7 = ds6.withColumn("runtime", when($"engineFlag" === 1, 
     ((unix_timestamp(lead($"timestamp", 1).over(Window.orderBy($"timestamp"))) - unix_timestamp($"timestamp"))/60) 
    ).otherwise(lit(0))) 

输出和级数如下所示。

+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------+ 
|   timestamp|status| msg|tempstat|tempstat_final|new_status|startFlag|stopFlag|bothFlags|engineFlag|runtime| 
+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------+ 
|2017-01-01 06:15:00| ASC_a|nice| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 07:00:00| ASC_a|nice| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 07:15:00| start|nice| null|   ASC_a|  start|  1|  0|  1|   1| 45.0| 
|2017-01-01 08:00:00| start|nice| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 22.0| 
|2017-01-01 08:22:00| ASC_b|init| ASC_b|   ASC_b|  ASC_b|  0|  0|  0|   1| 38.0| 
|2017-01-01 09:00:00| ASC_b|init| ASC_b|   ASC_b|  ASC_b|  0|  0|  0|   1| 30.0| 
|2017-01-01 09:30:00| end| bla| null|   ASC_b|  end|  0|  -1|  -1|   0| 0.0| 
|2017-01-01 10:00:00| end| bla| null|   ASC_b|  ASC_b|  0|  0|  0|   0| 0.0| 
|2017-01-01 10:45:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 11:00:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 12:00:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 12:15:00| start|meas| null|   ASC_a|  start|  1|  0|  1|   1| 45.0| 
|2017-01-01 13:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 60.0| 
|2017-01-01 14:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 60.0| 
|2017-01-01 15:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 22.0| 
|2017-01-01 15:22:00| ASC_c|init| ASC_c|   ASC_c|  ASC_c|  0|  0|  0|   1| 38.0| 
|2017-01-01 16:00:00| ASC_c|init| ASC_c|   ASC_c|  ASC_c|  0|  0|  0|   1| 31.0| 
|2017-01-01 16:31:00| end|meas| null|   ASC_c|  end|  0|  -1|  -1|   0| 0.0| 
+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------++ 

我总是乐于获得有关改进或其他解决方案建议的提示,因为我仍然对spark/scala很陌生。