2016-07-14 86 views
0

我有一个由“Events”,“Time”,“UserId”组成的日志文件。用pySpark计算用户事件之间的平均时间

+------------+----------------+---------+ 
| Events |  Time  | UserId | 
+------------+----------------+---------+ 
| ClickA  | 7/6/16 10:00am | userA | 
+------------+----------------+---------+ 
| ClickB  | 7/6/16 12:00am | userA | 
+------------+----------------+---------+ 

我想为每个用户计算事件之间的平均时间。你们如何解决这个问题? 在传统的编程环境中,我会经历用户的每个事件,并计算事件nn-1之间的时间增量,将此值添加到数组A.然后,我将计算每个值的平均值A. 如何使用Spark做到这一点?

回答

1

忽略日期解析它看起来像一个窗口功能,然后进行简单的聚集,从而大致一份工作,你需要这样的事:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{lag, avg} 

val df = Seq(
    ("ClickA", "2016-06-07 10:00:00", "UserA"), 
    ("ClickB", "2016-06-07 12:00:00", "UserA") 
).toDF("events", "time", "userid").withColumn("time", $"time".cast("timestamp")) 

val w = Window.partitionBy("userid").orderBy("time") 

// Difference between consecutive events in seconds 
val diff = $"time".cast("long") - lag($"time", 1).over(w).cast("long") 

df.withColumn("diff", diff).groupBy("userid").agg(avg($"diff")) 
+0

谢谢zero323! 你知道我该如何将这个字符串(2016/5/1 4:03:34 PM)转换为时间戳吗?我找不到pyspark的正确方法。 – Ahmet

+0

非常像这里显示:http://stackoverflow.com/a/36095322/1560062但你必须调整格式(https://docs.oracle.com/javase/7/docs/api/java/text /SimpleDateFormat.html) – zero323