2016-12-08 74 views
-1

我想利用已知观测最后一次正确的填写火花NaN值 - 见:Spark/Scala: fill nan with last good observation火花地图分区填写NaN值

我目前的解决方案中使用的窗函数,以完成任务。但是这并不好,因为所有的值都映射到一个单独的分区。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }应该会更好。但奇怪的是我的fill函数没有执行。我的代码有什么问题?

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|  null|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

以下是完整的示例代码:

import java.sql.Date 

import org.apache.log4j.{ Level, Logger } 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SparkSession 

case class FooBar(foo: Date, bar: String) 

object WindowFunctionExample extends App { 

    Logger.getLogger("org").setLevel(Level.WARN) 
val conf: SparkConf = new SparkConf() 
    .setAppName("foo") 
    .setMaster("local[*]") 

    val spark: SparkSession = SparkSession 
    .builder() 
    .config(conf) 
    .enableHiveSupport() 
    .getOrCreate() 

    import spark.implicits._ 

    val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    val recordsDF = myDff 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
    recordsDF.show 

    def notMissing(row: FooBar): Boolean = { 
    row.foo != null 
    } 

    val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap 
    println("###################### carry ") 
    println(toCarry) 
    println(toCarry.foreach(println)) 
    println("###################### carry ") 
    val toCarryBd = spark.sparkContext.broadcast(toCarry) 

    def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    var lastNotNullRow: FooBar = toCarryBd.value(i).get 
    iter.map(row => { 
     if (!notMissing(row))1 
     FooBar(lastNotNullRow.foo, row.bar) 
     else { 
     lastNotNullRow = row 
     row 
     } 
    }) 
    } 

    // The algorithm does not step into the for loop for filling the null values. Strange 
    val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 
    val imputedDF = imputed.toDS() 

    println(imputedDF.orderBy($"foo").collect.toList) 
    imputedDF.show 
    spark.stop 
} 

编辑

我固定的代码被评论所概述。但toCarryBd包含None值。这是怎样发生像我一样过滤明确的

def notMissing(row: FooBar): Boolean = {row.foo != null} 
iter.filter(notMissing(_)).toSeq.lastOption 

None值。

(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 

试图访问toCarryBd时,这会导致NoSuchElementException: None.get

回答

2

首先,如果你foo场可以为空,我会建议创建测试用例类为:

case class FooBar(foo: Option[Date], bar: String) 

然后,你可以重写你的notMissing功能是这样的:

def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
+0

你能解释为什么地图会产生大量的“无”条目? –

+0

@GeorgHeiler如果Seq为空,则iter.filter(notMissing(_)).toSeq.lastOption'将不返回任何值。 –

+0

谢谢。为什么当原始df仅包含4行时,它会执行8次? –