2017-08-12 65 views
0

任何人都可以帮助接受返回迭代器listWords()方法mapPartitions。映射分区迭代器返回

object MapPartitionExample { 

    def main(args: Array[String]): Unit = { 

    val conf= new SparkConf().setAppName("MapPartitionExample").setMaster("local[*]") 
    val sc= new SparkContext(conf) 

    val input:RDD[String] = sc.parallelize(List("ABC","DEF","GHU","YHG")) 

    val x= input.mapPartitions(word => listWords(word)) 


    } 

    def listWords(words: Iterator[String]) : util.Iterator[String] = { 

    val arrList = new util.ArrayList[String]() 
    while(words.hasNext) { 
     arrList.add(words.next()) 
    } 
    return arrList.iterator() 
    } 

} 

回答

0

Iterable[NotInferU]预期,但你是通过导入scala.collection.JavaConversions._如下

def listWords(words: Iterator[String]) : Iterator[String] = { 
    val arrList = new util.ArrayList[String]() 
    while(words.hasNext) { 
     arrList.add(words.next()) 
    } 
    import scala.collection.JavaConversions._ 
    return arrList.toList.iterator 
    } 

代码的休息恢复java.util.Iterator[String]

您需要的java.util.Iterator转换为scala Iterator是因为它是。

我希望答案是mapPartitions使用应该是scala.collection.Iterator,不java.util.Iterator功能的有益

+0

非常感谢.... – Tinku

+0

我的荣幸@ Tinku :)是否有效? –

+0

是的,工作完美。 – Tinku

0

返回类型。我没有看到你当前的代码的远点,但你可以使用Scala的可变集合:

import scala.collection.mutable.ArrayBuffer 

def listWords(words: Iterator[String]) : Iterator[String] = { 
    val arr = ArrayBuffer[String]() 
    while(words.hasNext) { 
    arr += words.next() 
    } 
    arr.toIterator 
} 

我个人倒只是map

def listWords(words: Iterator[String]) : Iterator[String] = { 
    // Some init code 
    words.map(someFunction) 
} 
+0

非常感谢...它的工作。 – Tinku