2016-07-05 99 views
0

我有用户游戏会话,其中包含:用户ID,游戏ID,分数和游戏进行时的时间戳。从后续项目中总结第一个分组的项目

from pyspark import SparkContext 
from pyspark.sql import HiveContext 
from pyspark.sql import functions as F 

sc = SparkContext("local") 

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame([ 
    ("u1", "g1", 10, 0), 
    ("u1", "g3", 2, 2), 
    ("u1", "g3", 5, 3), 
    ("u1", "g4", 5, 4), 
    ("u2", "g2", 1, 1), 
], ["UserID", "GameID", "Score", "Time"]) 

所需的输出

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 

我要变换的数据,我得到的第一场比赛的用户播放以及第二场比赛的最高分的最高分(奖金如果我也可以获得所有后续游戏的最高分数)。不幸的是,我不确定用Spark SQL可以做什么。

我知道我可以通过UserID,GameID进行分组,然后通过agg来获得最高分和最短时间。不知道如何从那里继续。

说明:请注意MaxScoreGame1和MaxScoreGame2是指第一个和第二个游戏用户播放器;不是GameID。

回答

1

您可以尝试使用Window函数和Pivot的组合。

  1. 获取按时间排序的用户ID划分的每个游戏的行号。
  2. 过滤掉GameNumber为1或2.
  3. 旋转即可获得所需的输出形状。

不幸的是我使用scala而不是python,但下面的代码应该很容易转换为python库。

import org.apache.spark.sql.expressions.Window 

// Use a window function to get row number 
val rowNumberWindow = Window.partitionBy(col("UserId")).orderBy(col("Time")) 

val output = { 
    df 
    .select(
     col("*"), 
     row_number().over(rowNumberWindow).alias("GameNumber") 
    ) 
    .filter(col("GameNumber") <= lit(2)) 
    .groupBy(col("UserId")) 
    .pivot("GameNumber") 
    .agg(
     sum(col("Score")) 
    ) 
} 

output.show() 

+------+---+----+ 
|UserId| 1| 2| 
+------+---+----+ 
| u1| 10| 2| 
| u2| 1|null| 
+------+---+----+ 
+1

另外,如果你想看到两个以上的游戏在输出就是不过滤和枢轴将其余的工作补充。 – Blakey

+0

窗口和row_number做了诀窍。我将在PySpark中发布我的解决方案,它有点不同。你可以验证你的代码是否适合演出,所以我可以给你答案? – ksindi

+1

刚刚更新输出,也注意到我实际上使用select而不是groupBy在数据透视表上,这是行不通的。根据你的帖子(“u1”,“g3”,2,2),(“u1”,“u1”),对于你如何得到5作为用户1的第二游戏得分,假设原始数据框中存在拼写错误“g3”,5,3), – Blakey

1

解决方案与PySpark:

from pyspark.sql import Window 

rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time")) 

(df 
.groupBy("UserID", "GameID") 
.agg(F.max("Score").alias("Score"), 
     F.min("Time").alias("Time")) 
.select(F.col("*"), 
     F.row_number().over(rowNumberWindow).alias("GameNumber")) 
.filter(F.col("GameNumber") <= F.lit(2)) 
.withColumn("GameMaxScoreCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber"))) 
.groupBy("UserID") 
.pivot("GameMaxScoreCol") 
.agg(F.max("Score")) 
).show() 

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 
相关问题