2015-11-17 50 views
0

我想声明一个函数来得到两个RDDcogroup。其实这是一个interSectionByKey。下面的代码不能编译:如何声明函数来调用cogroup

def getRetain[K, V](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={ 
    activeUserRdd.cogroup(newUserRdd).flatMapValues{ 
     x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else null).asInstanceOf[V]) 
    } 
    } 

错误:

value cogroup is not a member of org.apache.spark.rdd.RDD[(K, V)] 

我觉得(K, V)匹配小姐在cogroup声明的真正[(K, V)],但是这是在我的函数声明的正确方法?

回答

0

ClassTag应用于您的输入类型,以确保在运行时可以访问已擦除类型KV。这是由于type erasure in Scala

scala> import scala.reflect.ClassTag 
import scala.reflect.ClassTag 

scala> def getRetain[K : ClassTag, V : ClassTag](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={ 
|  activeUserRdd.cogroup(newUserRdd).flatMapValues{ 
|   x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else null).asInstanceOf[V]) 
|  } 
|  } 
getRetain: [K, V](activeUserRdd: org.apache.spark.rdd.RDD[(K, V)], newUserRdd: org.apache.spark.rdd.RDD[(K, V)])(implicit evidence$1: scala.reflect.ClassTag[K], implicit evidence$2: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[(K, V)] 
+0

我没有这个包scala.reflect.TypeTag,你知道一些其他的解决方案,只是使用原来的阶。我认为它可以帮助我理解scala。 编译时发生错误。谢谢。 – yyforever1988

+0

对不起,我打错输入。请再试一次。 –

+0

它的工作,谢谢 – yyforever1988