2016-06-10 29 views
0

我在spark数据框中有最终记录(在连接和过滤之后)。我需要比较连续行的(按键分区)列值并基于条件需要更改e_date列值例如:如何在Spark-scala中实现LEAD和LAG

sample table 
    key1 key 2 col1 col2 s_date  e_date 
    a  1  cv1  cv2 2014   2099 
    a  1  cv3  cv2 2016   2099 
    b  2  cv5  cv6 2016   2099 
    b  2  cv5  cv6 2016   2099 

    final table should look like 
    key1 key 2 col1 col2 s_date  e_date 
    a  1  cv1  cv2 2014   2015 (next records s_date-1) 
    a  1  cv3  cv2 2016   2099 
    b  2  cv5  cv6 2016   2099 
  1. 上述表具有复合密钥,以便KEY1和KEY2是键

  2. 通过键比较在分区col1和COL2值

  3. 如果任何列具有与新记录的s_date -1(在最后的表线1,2)

  4. 如果没有变化,则忽略(在最后的表线3)新的记录

任何新值结束旧记录指针斯卡拉火花

回答

4

超前和滞后已经实施:

import org.apache.spark.sql.functions.{lead, lag} 
import org.apache.spark.sql.expressions.Window 

lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date)) 

检查Introducing Window Functions in Spark SQL了解详情。

+0

此解决方案仅适用于您的窗口规范中的每个分区足够小以适合一个执行程序节点。 –