2017-10-12 75 views
1

我有一个类型为timestamp的“date”列和long类型的“数量”列的火花数据框。对于每个日期,我都有一些数量的价值。日期按升序排列。但是有些日期不见了。 对于如 - 当前DF -在火花数据框列中填充缺少的日期

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
14-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
20-09-2016 | 2 

正如你所看到的,DF有一些失踪日期像2016年12月9日,13-09-2016等我想把0在数量领域那些缺少的日期,使得所得的DF应该看起来像 -

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
12-09-2016 | 0 
13-09-2016 | 0 
14-09-2016 | 0 
15-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
18-09-2016 | 0 
19-09-2016 | 0 
20-09-2016 | 2 

任何帮助/建议这一点将不胜感激。提前致谢。 请注意,我在scala编码。

回答

2

我已经写了这个答案有点详细的方式,以便于理解代码。它可以被优化。

所需进口

import java.time.format.DateTimeFormatter 
import java.time.{LocalDate, LocalDateTime} 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{LongType, TimestampType} 

UDF的字符串到有效的日期格式

val date_transform = udf((date: String) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y") 
    val dt = LocalDate.parse(date, dtFormatter) 
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 

下面UDF代码Iterate over dates range

def fill_dates = udf((start: String, excludedDiff: Int) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") 
    val fromDt = LocalDateTime.parse(start, dtFormatter) 
    (1 to (excludedDiff - 1)).map(day => { 
     val dt = fromDt.plusDays(day) 
     "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 
    }) 
采取

设置样本数据框(df

val df = Seq(
     ("10-09-2016", 1), 
     ("11-09-2016", 2), 
     ("14-09-2016", 0), 
     ("16-09-2016", 1), 
     ("17-09-2016", 0), 
     ("20-09-2016", 2)).toDF("date", "quantity") 
     .withColumn("date", date_transform($"date").cast(TimestampType)) 
     .withColumn("quantity", $"quantity".cast(LongType)) 

df.printSchema() 
root 
|-- date: timestamp (nullable = true) 
|-- quantity: long (nullable = false) 


df.show()  
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-14 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+ 

df创建一个临时数据框(tempDf)到union

val w = Window.orderBy($"date") 
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date")) 
    .filter($"diff" > 1) // Pick date diff more than one day to generate our date 
    .withColumn("next_dates", fill_dates($"date", $"diff")) 
    .withColumn("quantity", lit("0")) 
    .withColumn("date", explode($"next_dates")) 
    .withColumn("date", $"date".cast(TimestampType)) 

tempDf.show(false) 
+-------------------+--------+----+------------------------+ 
|date    |quantity|diff|next_dates    | 
+-------------------+--------+----+------------------------+ 
|2016-09-12 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-13 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-15 00:00:00|0  |2 |[2016-09-15]   | 
|2016-09-18 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
|2016-09-19 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
+-------------------+--------+----+------------------------+ 

现在联盟2个dataframes

val result = df.union(tempDf.select("date", "quantity")) 
    .orderBy("date") 

result.show() 
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-12 00:00:00|  0| 
|2016-09-13 00:00:00|  0| 
|2016-09-14 00:00:00|  0| 
|2016-09-15 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-18 00:00:00|  0| 
|2016-09-19 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+