2017-02-07 45 views
1

我有这样的自定义斯卡拉对象(基本上是一个Java POJO):过滤RDD与CustomObject,类型不匹配

object CustomObject { 

    implicit object Mapper extends JavaBeanColumnMapper[CustomObject] 

} 


class CustomObject extends Serializable { 


    @BeanProperty 
    var amount: Option[java.lang.Double] = _ 

    ... 
} 

在我的主类,我已经加载包含这些CustomObjects的RDD。 我试图对其进行过滤,并创建一个只包含有量> 5000

val customObjectRDD = sc.objectFile[CustomObject]("objectFiles") 
val filteredRdd = customObjectRDD.filter(x => x.amount > 5000) 
println(filteredRdd.count()) 

然而,我的编辑说

类型不匹配的对象的新RDD:预期:(CustomObject)= >布尔值,实际值: (CustomObject)=>任何

我该怎么做才能使其工作?

回答

3

>运营商未在Option[Double]定义,你的过滤谓词将需要处理的Option

scala> case class A(amount: Option[Double]) 
defined class A 

scala> val myRDD = sc.parallelize(Seq(A(Some(10000d)), A(None), A(Some(5001d)), A(Some(5000d)))) 
myRDD: org.apache.spark.rdd.RDD[A] = ParallelCollectionRDD[12] at parallelize at <console>:29 

scala> myRDD.filter(_.amount.exists(_ > 5000)).foreach{println} 
A(Some(10000.0)) 
A(Some(5001.0)) 

这是假设与amount = None任何对象应该失败过滤器谓词。有关Option.exists的定义,请参见the docs