2016-10-07 14 views
0

我创建2对RDD的火花遍历火花协同组()pairrdd输出阶

var pairrdd = sc.parallelize(List((1,2),(3,4),(3,6))) 
var pairrdd2 = sc.parallelize(List((3,9))) 

我施加的协同组的功能

var cogrouped = pairrdd.cogroup(pairrdd2) 

为cogroupedrdd对象类型看起来像的下方。

cogrouped: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[801] at cogroup at <console>:60 

我试图创建一个函数来遍历这些值

def iterateThis((x: Int,(x1:Iterable[Int],x2:Iterable[Int])))={ 
    println(x1.mkString(",")) 
} 

,但我会得到下面的错误。

<console>:21: error: identifier expected but '(' found. 
     def iterateThis((x: Int,(x1:Iterable[Int],x2:Iterable[Int])))={ 
       ^
+0

如何使用'cogroup'用于大型数据集就像当我用'收集()'它抛出内存溢出异常'RDD1集= rdd2.cogroup (rdd3).collect'。你可以帮助解决这个问题[https://stackoverflow.com/questions/47180307/how-to-use-cogroup-for-large-datasets]。可以分区帮助我是新来的任何帮助来解决这个问题。 – Vignesh

回答

1

你的论点(Int, (Iterable[Int], Iterable[Int]))类型:

def iterateThis(arg: (Int, (Iterable[Int], Iterable[Int]))) = { 
    val (_, (x1, _)) = arg 
    println(x1.mkString(",")) 
} 
+0

非常感谢这个超音速响应,请您详细说明为什么这是错误的def iterateThis((x:Int,(x1:Iterable [Int],x2:Iterable [Int])))) –