2016-08-19 86 views
-4

的内容,我尝试打印我的RDD RDD[(String,List[(String,String)])]的内容:无法打印RDD

val sc = new SparkContext(conf) 
val splitted = rdd.map(line => line.split(",")) 
val processed = splitted.map(x=>(x(1),List((x(0),x(2),x(3),x(4))))) 
val grouped = processed.reduceByKey((x,y) => (x ++ y)) 
System.out.println(grouped) 

然而不是获取内容的我看到:

ShuffledRDD[4] at reduceByKey at Consumer.scala:88

UPDATE:

TXT文件的内容:

100001082016,230,111,1,1 
100001082016,121,111,1,1 
100001082016,110,111,1,1 

更新2(整个代码):

class Consumer() 
{ 

def run() = { 
    val conf = new SparkConf() 
           .setAppName("TEST") 
           .setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    val rdd = sc.textFile("file:///usr/test/myfile.txt") 
    val splitted = rdd.map(line => line.split(",")) 
    val processed = splitted.map(x=>(x(1),List((x(0),x(2),x(3),x(4))))) 
    val grouped = processed.reduceByKey((x,y) => (x ++ y)) 
    System.out.println(grouped) 
} 

} 
+0

stacktrace的其余部分是什么意思? –

+0

而在斯卡拉,你会做'println(grouped.collect())'。无需System.out –

+0

@ cricket_007:在这种情况下,我得到'[Lscala.Tuple2; @ 5377414a'。其余的堆栈是标准的Spark输出,例如'6/08/19 13:49:39 INFO DAGScheduler:作业0完成:收集在Consumer.scala:89,花费0.519500 s'等。 – HackerDuck

回答

3

这里没有问题:

scala> val rdd = sc.parallelize(Seq("100001082016,230,111,1,1","100001082016,121,111,1,1","100001082016,110,111,1,1")) 
// rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val splitted = rdd.map(line => line.split(",")) 
// splitted: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at <console>:29 

scala> val processed = splitted.map(x=>(x(1),List((x(0),x(2),x(3),x(4))))) 
// processed: org.apache.spark.rdd.RDD[(String, List[(String, String, String, String)])] = MapPartitionsRDD[2] at map at <console>:31 

scala> val grouped = processed.reduceByKey((x,y) => (x ++ y)) 
// grouped: org.apache.spark.rdd.RDD[(String, List[(String, String, String, String)])] = ShuffledRDD[3] at reduceByKey at <console>:33 

scala> grouped.collect().foreach(println) 
// (121,List((100001082016,111,1,1))) 
// (110,List((100001082016,111,1,1))) 
// (230,List((100001082016,111,1,1))) 

以下是错误的。它可以作为预期,但你必须正确理解的语言来知道什么是期望:

scala> System.out.println(grouped) 
// ShuffledRDD[3] at reduceByKey at <console>:33 

编辑:只是要清楚,如果你想打印一个集合,你就需要使用mkString方法可用于您需要打印的集合,将其转换为您想要的格式。

+0

'grouped.collect()'在控制台中工作,因为它专门处理打印数组。 'println(grouped.collect())'_doesn't_根据需要工作:试试'println(Array(1,2))'。 –

+0

@AlexeyRomanov我没有说'println(grouped.collect)'。仍然println(分组)将使用'ShuffledRDD'的'toString方法'。我认为我们都同意这一点,因此按预期工作。 – eliasah

+0

@AlexeyRomanov这里的分组收集只是一个例子,也许我应该删除它以避免混淆。 – eliasah