2016-03-17 51 views
1

我不知道它是否可能,但我想在我的mapPartitions中将变量“a”分成两个列表。像这里有一个列表l存储所有数字和另一个列表让我们说b存储所有单词。用类似a.mapPartitions((p,v) =>{ val l = p.toList; val b = v.toList; ....}在地图中分割键值scala

随着例如在我的for循环升(I)= 1,B(1)= “分数”

import scala.io.Source 
import org.apache.spark.rdd.RDD 
import scala.collection.mutable.ListBuffer 

val a = sc.parallelize(List(("score",1),("chicken",2),("magnacarta",2))) 

a.mapPartitions(p =>{val l = p.toList; 
    val ret = new ListBuffer[Int] 
    val words = new ListBuffer[String] 
    for(i<-0 to l.length-1){ 
    words+= b(i) 
    ret += l(i) 
    } 
ret.toList.iterator 
} 
) 
+0

'a.map(_._ 1); a.map(_._ 2)'? – zero323

回答

1

Spark是分布式计算引擎。您可以对群集节点上的分区数据执行操作。然后,您需要一个执行汇总操作的Reduce()方法。

你想要什么,请参阅该代码是应该做的:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

object SimpleApp { 

    class MyResponseObj(var numbers: List[Int] = List[Int](), var words: List[String] = List[String]()) extends java.io.Serializable{ 
    def +=(str: String, int: Int) = { 
     numbers = numbers :+ int 
     words = words :+ str 
     this 
    } 

    def +=(other: MyResponseObj) = { 
     numbers = numbers ++ other.numbers 
     words = words ++ other.words 
     this 
    } 

    } 


    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]") 
    val sc = new SparkContext(conf) 
    val a = sc.parallelize(List(("score", 1), ("chicken", 2), ("magnacarta", 2))) 

    val myResponseObj = a.mapPartitions[MyResponseObj](it => { 
     var myResponseObj = new MyResponseObj() 
     it.foreach { 
     case (str :String, int :Int) => myResponseObj += (str, int) 
     case _ => println("unexpected data") 
     } 
     Iterator(myResponseObj) 
    }).reduce((myResponseObj1, myResponseObj2) => myResponseObj1 += myResponseObj2) 

    println(myResponseObj.words) 
    println(myResponseObj.numbers) 

    } 
} 
+0

谢谢,很好的解决方案,但我不知道如何启动它? 我尝试过:load test.scala(test .scala =我的文件的名称) 但是它只显示一切正常加载(我用Windows和NotePad ++中的终端代码) – MarcelRitos

+0

我会推荐下载一个ide这样的intellij或者eclipse并安装一个scala插件。 –