2017-01-04 70 views
1

如何处理空分区mapPartitionsWithIndex火花mapPartitionsWithIndex处理空分区

完整的例子可以发现:https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

我的目标是通过RDD的一个已知的完好价值的Spark/Scala: fill nan with last good observation的改进,以填补NaN值。

但一些分区不包含任何值:

###################### carry 
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None) 
(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 
() 
###################### carry 

case class FooBar(foo: Option[Date], bar: String) 
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
myDf.rdd.filter(x => notMissing(Some(x))).count 
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap 

当使用

val toCarryBd = spark.sparkContext.broadcast(toCarry) 
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    if (iter.isEmpty) { 
     iter 
    } else { 
     var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get 
     iter.map(foo => { 
     println("original ", foo) 
     if (!notMissing(Some(foo))) { 
      println("replaced") 
      // this will go into the default case 
      // FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar) 
      FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error 
     } else { 
      lastNotNullRow = Some(foo) 
      foo 
     } 
     }) 
    } 
    } 

    val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 

填补它会崩溃的值。

编辑

如果从答案中应用输入,则输出。还没100%有

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|2016-01-04|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

回答

1

至于工作mapPartitions(以及类似)处理时,空分区,一般的方法是,当你有一个空的输入迭代器返回正确类型的空迭代。

它看起来像你的代码是这样做的,但是它看起来像你的应用程序逻辑中可能有一个错误(即它假定如果一个分区有一个记录缺少一个值,它将有一个前一行相同的分区,这是好的,或者以前的分区不是空的并且具有好的行 - 这不一定是这种情况)。你已经部分解决了这个问题,并且对于每个分区收集最后一个好值,然后如果在分区开始时没有很好的值,请查看收集数组中的值。

但是,如果这也发生在同一时间以前的分区为空,您将需要去查找以前的分区值,直到找到您正在查找的分区值。 (请注意,假定数据集中的第一条记录是有效的,如果不是,您的代码仍然会失败)。

您的解决方案非常接近工作,但只是有一些小的假设,并不总是必要的。

+0

感谢您的评论。这有助于我填补下一个,但不是最后一个已知的价值。 –

+0

对,你只需要向后搜索做最后一次已知的好事。 – Holden

+0

你的意思是代替+1 a -1:'while(lastNotNullRow == None){last_NotNullRow = toCarryBd.value.get(i + 1).get }'?但是如果第一个分区是空的,那么这将不起作用(在这种情况下),我认为替换映射已经按照正确的顺序。 –