2014-11-04 152 views
0
// 4 workers 
    val sc = new SparkContext("local[4]", "naivebayes") 

    // Load documents (one per line). 
    val documents: RDD[Seq[String]] = sc.textFile("/tmp/test.txt").map(_.split(" ").toSeq) 

    documents.zipWithIndex.foreach{ 
    case (e, i) => 
    val collectedResult = Tokenizer.tokenize(e.mkString) 
    } 

    val hashingTF = new HashingTF() 
    //pass collectedResult instead of document 
    val tf: RDD[Vector] = hashingTF.transform(documents) 

    tf.cache() 
    val idf = new IDF().fit(tf) 
    val tfidf: RDD[Vector] = idf.transform(tf) 
在上面的代码片断

,我想提取collectedResult重用它hashingTF.transform,如何才能实现这一目标,其中记号化功能的签名是转换斯卡拉字符串RDD [SEQ [字符串]

def tokenize(content: String): Seq[String] = { 
... 
} 

回答

1

看起来像你想map而不是foreach。我不明白你在为什么使用zipWithIndex,也不知道你为什么要在你的线路上打电话split,直接与mkString再次联系。

val lines: Rdd[String] = sc.textFile("/tmp/test.txt") 
val tokenizedLines = lines.map(tokenize) 
val hashes = tokenizedLines.map(hashingTF) 
hashes.cache() 
... 
+0

@Imm我该如何声明其他rdd?对不起,我是一个新手! – Siva 2014-11-04 09:45:42

+0

你说你想把该函数的返回值追加到其他一些'RDD [Seq [String]]',否? 'otherRdd'是你想追加的。 – lmm 2014-11-04 09:54:22

+0

inp.zipWithIndex.foreach {e,i)=> val result:RDD [Seq [String]] ++ = sc.parallelize(Seq(Tokenizer.tokenize(e))) } 我想这个是错的,我想把结果声明为循环,追加它并得到一个rdd。我不知道我该如何做到这一点。 – Siva 2014-11-04 10:07:00

相关问题