5

我正在查看Spark SQL,Scala中的Spark DataFrame的窗口幻灯片函数。spark SQL窗口函数滞后

我有一个数据帧与列Col1,Col1,Col1,日期。

Col1 Col2 Col3 date  volume new_col 
         201601 100.5 
         201602 120.6 100.5 
         201603 450.2 120.6 
         201604 200.7 450.2 
         201605 121.4 200.7` 

现在我想添加一个名为new_col的新列,其中一行向下滑动,如上所示。

我试过下面的选项来使用窗口函数。

val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1)) 

任何人都可以请帮助我如何做到这一点。

+0

@Ramesh until Spark 2.0,用户不得不使用'HiveContext'而不是'SQLContext'来应用窗口函数。通过传递'SparkContext'的实例,''HiveContext'的创建方式与'SQLContext'相同。如果我没有记错的话,你还需要为你的Spark发行版加入'org.apache.spark:spark-hive_2.10'。 –

+0

@msrinivas,谢谢你的回答是正确的。 – Ramesh

回答

9

你正确你错过了做的是over(window expression)lag

val df = sc.parallelize(Seq((201601, 100.5), 
    (201602, 120.6), 
    (201603, 450.2), 
    (201604, 200.7), 
    (201605, 121.4))).toDF("date", "volume") 

val w = org.apache.spark.sql.expressions.Window.orderBy("date") 

import org.apache.spark.sql.functions.lag 

val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w)) 

leadDf.show() 

+------+------+-------+ 
| date|volume|new_col| 
+------+------+-------+ 
|201601| 100.5| 0.0| 
|201602| 120.6| 100.5| 
|201603| 450.2| 120.6| 
|201604| 200.7| 450.2| 
|201605| 121.4| 200.7| 
+------+------+-------+ 

此代码对星火外壳已运行2.0.2

+0

我没有1.5.2安装程序,并面临Maven在我的机器中加载1.5.2(spark-hive jar)的问题。 – mrsrinivas

+0

我现在可以创建配置单元环境。但是我仍然得到同样的错误。 – Ramesh

+0

我认为,因为数据框是使用sqlcontext创建的,我仍然无法使用窗口函数。 – Ramesh

1

您可以导入以下两个包,这将解决滞后的问题依赖。

import org.apache.spark.sql.functions.{lead, lag} 
import org.apache.spark.sql.expressions.Window