2016-11-30 160 views
10

我有一个RDD结构火花:RDD列出

RDD[(String, String)] 

和我想创建2名列表(一个用于RDD的每个维度)。我试图使用rdd.foreach()并填充两个ListBuffers,然后将它们转换为列表,但我猜每个节点都会创建自己的ListBuffer,因为迭代后BufferLists是空的。我该怎么做 ?

编辑:我的做法

val labeled = data_labeled.map { line => 
    val parts = line.split(',') 
    (parts(5), parts(7)) 
}.cache() 

var testList : ListBuffer[String] = new ListBuffer() 

labeled.foreach(line => 
    testList += line._1 
) 
    val labeledList = testList.toList 
    println("rdd: " + labeled.count) 
    println("bufferList: " + testList.size) 
    println("list: " + labeledList.size) 

,其结果是:

rdd: 31990654 
bufferList: 0 
list: 0 
+1

请用你的代码更新已经尝试过和一些输入数据样本和预期输出!你的问题对我来说不是很清楚。 – eliasah

回答

9

如果你真的要创建两个列表 - 这意味着,你希望所有的分布式数据被收集到驱动程序应用程序(冒险缓慢或OutOfMemoryError) - 您可以使用collect,然后使用简单的map操作结果:

val list: List[(String, String)] = rdd.collect().toList 
val col1: List[String] = list.map(_._1) 
val col2: List[String] = list.map(_._2) 

或者 - 如果你想“分”你的RDD为两个RDDS - 它没有数据收集相当类似:

rdd.cache() // to make sure calculation of rdd is not repeated twice 
val rdd1: RDD[String] = rdd.map(_._1) 
val rdd2: RDD[String] = rdd.map(_._2) 

第三种方法是首先映射到这两个RDDS和然后收集其中的每一个,但与第一个选项没有多大区别,并且遭受相同的风险和限制。

+0

@Yuriy这里有关广播变量(这是只读的)吗?你能更详细地描述它吗? – avr

+0

@avr ListBuffer是可变的,'+ ='突变内部状态,不会创建新的引用。但是你的问题是好的,对于不可变的语句(其中引用改变任何操作)需要用一些东西(Serializable)来包装它。 List的简单示例:'val testList = sc.broadcast(new Serializable {var list = List.empty [String]})',并且在mutate内部状态之后。 – Yuriy

+0

@Yuriy我认为avr是正确的,你误解了他/她的问题 - 这不是一个可变的与不可变的收集问题 - 广播变量是只读的 - 从某种意义上说,如果它们的值在执行程序上发生变化,不会看到这种改变(Spark将如何汇总所有执行者所做的更改?)。这在本地模式下工作的事实看起来大多像一个错误,它不会在集群实际分布的地方工作。 –

1

至于Tzach琐的回答另外,您也可以使用unzip的列表:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d"))) 
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val (l1, l2) = myRDD.collect.toList.unzip 
l1: List[String] = List(a, c) 
l2: List[String] = List(b, d) 
RDD小号

或者keysvalues

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values) 
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33 
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33 

scala> rdd1.foreach{println} 
a 
c 

scala> rdd2.foreach{println} 
d 
b