2016-10-26 22 views
3

所以我有一个用户名的数据框他们已经发布的线程和这些帖子的时间戳。如果要弄清楚谁是线程的第一个用户,以及什么时间是什么,我该怎么办。我知道要弄清楚第一篇文章是在线上做一个小组,然后在时间戳上做一个小时。但是,这将删除用户名。我如何使用该组并保留用户名?使用group by时,在Spark中保留未使用的列?

+2

按用户名称分组 – eliasah

回答

1

您可以通过使用HiveContext和Hive的named_struct函数来执行此操作。诀窍是min将按照从左到右的顺序评估列来处理结构,如果当前列相同,则只移动到下一列。因此,在这种情况下,它实际上只是比较时间戳列,但是通过在min函数吐出结果之后创建一个包含名称的结构,您将可以访问该结构。

data = [ 
    ('user', 'thread', 'ts'), 
    ('ryan', 1, 1234), 
    ('bob', 1, 2345), 
    ('bob', 2, 1234), 
    ('john', 2, 2223) 
] 

header = data[0] 
rdd = sc.parallelize(data[1:]) 
df = sqlContext.createDataFrame(rdd, header) 
df.registerTempTable('table') 

sql = """ 
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest 
FROM table 
GROUP BY thread 
""" 

grouped = sqlContext.sql(sql) 
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp') 
1

这可以使用row_number()窗口函数来完成,这将使所有其他列保持不变。 使用withColumn创建一个类似“thread_user_order”的新列,其值应为row_number()PARTITION BY线程ORDER BY ts。 然后过滤器“thread_user_order”== 1

下面是一些伪代码:

df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1)) 
1

您可以使用该结构的排序为场通过顺序和排序同时保留两列。然后,当您拨打min时,它会先按时间戳排序,然后再按用户名(如果/当两次相连)进行排序。

user_time = functions.struct(df.timestamp, df.username).alias('user_time') 
min_thread_users_df = df.select(df.thread, user_time).groupby('thread').agg(
    functions.min('user_time').alias('user_time')).select(
    'thread', 'user_time.username', 'user_time.timestamp')