2017-05-19 149 views
0

相关获得列值比方说,我有三列的数据帧:在星火1.6 /斯卡拉,与骨料

itemid, date, price 
1, 2017-05-18, $1.10 
2, 2017-05-18, $2.20 
1, 2017-04-12, $0.90 
1, 2017-03-29, $1.00 

现在,我想按的itemid,得到的最早日期,并获得价格匹配最早的日期。 (我们可以假设(的itemid,日期)是唯一的)

的输入输出上面会:

1, 2017-03-29, $1.00 
2, 2017-05-18, $2.20 

在SQL中,我可以用做自联接 - 第一选择每个itemid的最短日期,然后选择日期与最低日期匹配的价格和日期。

我该如何在Scala Spark DataFrame中表达这一点? 如果答案仍然涉及到自连接,那么Spark 1.6中的DataFrame查询执行器是否足够聪明,不足以实际实现连接?

回答

1

一种方法是使用类似于以下SparkSQL窗函数:

import org.apache.spark.sql.expressions.Window 

val df = Seq(
    (1, "2017-05-18", 1.10), 
    (2, "2017-05-18", 2.20), 
    (1, "2017-04-12", 0.90), 
    (1, "2017-03-29", 1.00) 
).toDF(
    "itemid", "date", "price" 
).as[(Integer, String, Double)] 

// Add earliest date by itemid via window function and 
// keep only rows with earliest date by itemid 
val df2 = df.withColumn("earliestDate", min("date").over(
    Window.partitionBy("itemid") 
)). 
    where($"date" === $"earliestDate") 

df2.show 
+------+----------+-----+------------+ 
|itemid|  date|price|earliestDate| 
+------+----------+-----+------------+ 
|  1|2017-03-29| 1.0| 2017-03-29| 
|  2|2017-05-18| 2.2| 2017-05-18| 
+------+----------+-----+------------+ 
+0

谢谢你的解决方案。事实证明,这与自连接方法有点类似 - 因为(itemid,date)已经是一个有保证的唯一键,我可以使用标准聚合计算最小价格换ID,然后重新加入。我不是使用行ID,而是使用唯一键,而不是使用窗口,我可以使用groupBy()。 –

+0

@Jon Watte,是的,使用groupBy和self-join(itemid,date)是唯一的,不需要创建一个唯一的列。事实上,在这种简单的情况下,如果使用窗口函数,则不需要自连接(因此肯定不需要创建唯一的rowid)。我已经更新了我的答案。 –

+0

感谢您的澄清,并给出正确答案! –