2016-03-17 22 views
4

我已阅读过其他SO帖子中的这个问题,但我仍然不知道自己做错了什么。原则上,加入这两行:value toDF不是org.apache.spark.rdd.RDD的成员

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.implicits._ 

应该做的伎俩,但错误依然存在

这是我build.sbt:

name := "PickACustomer" 

version := "1.0" 

scalaVersion := "2.11.7" 


libraryDependencies ++= Seq("com.databricks" %% "spark-avro" % "2.0.1", 
"org.apache.spark" %% "spark-sql" % "1.6.0", 
"org.apache.spark" %% "spark-core" % "1.6.0") 

和我的Scala代码是:

import scala.collection.mutable.Map 
import scala.collection.immutable.Vector 

import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql._ 


    object Foo{ 

    def reshuffle_rdd(rawText: RDD[String]): RDD[Map[String, (Vector[(Double, Double, String)], Map[String, Double])]] = {...} 

    def do_prediction(shuffled:RDD[Map[String, (Vector[(Double, Double, String)], Map[String, Double])]], prediction:(Vector[(Double, Double, String)] => Map[String, Double])) : RDD[Map[String, Double]] = {...} 

    def get_match_rate_from_results(results : RDD[Map[String, Double]]) : Map[String, Double] = {...} 


    def retrieve_duid(element: Map[String,(Vector[(Double, Double, String)], Map[String,Double])]): Double = {...} 




    def main(args: Array[String]){ 
     val conf = new SparkConf().setAppName(this.getClass.getSimpleName) 
     if (!conf.getOption("spark.master").isDefined) conf.setMaster("local") 

     val sc = new SparkContext(conf) 

     //This should do the trick 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     import sqlContext.implicits._ 

     val PATH_FILE = "/mnt/fast_export_file_clean.csv" 
     val rawText = sc.textFile(PATH_FILE) 
     val shuffled = reshuffle_rdd(rawText) 

     // PREDICT AS A FUNCTION OF THE LAST SEEN UID 
     val results = do_prediction(shuffled.filter(x => retrieve_duid(x) > 1) , predict_as_last_uid) 
     results.cache() 

     case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double) 

     val summary = results.map(x => Summary(x("match"), x("t_to_last"), x("nflips"), x("d_uid"), x("truth"), x("guess"))) 


     //PROBLEMATIC LINE 
     val sum_df = summary.toDF() 

    } 
    } 

我总是得到:

值toDF不是org.apache.spark.rdd.RDD [总结]

现在位丢失的成员。有任何想法吗?

+1

可你至少键入你的价值观,给我们的使用的方法的定义是什么? – eliasah

+0

尝试定义你的'案例类总结'主 – drstein

+0

@eliasah,对不起,有点新的scala,没有意识到这将有所帮助。请参阅编辑。 – elelias

回答

7

移动你的情况下,类main外:

object Foo { 

    case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double) 

    def main(args: Array[String]){ 
    ... 
    } 

} 

一些关于它的作用域是防止火花从能够处理模式的自动推导Summary。 FYI我实际上是从sbt得到了不同的错误:

没有TypeTag可供摘要

+0

哇,那DID工作!非常感谢 – elelias

+1

如果它可行,你可以接受答案:) – eliasah

+1

是的,谢谢你的帮助@eliasah – elelias

0

很大。救我的命

移动你的情况下,类主外:

object Foo { 

    case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double) 

    def main(args: Array[String]){ 
... 
    } 
} 
相关问题