2016-04-04 21 views
0

我正在学习virtualbox上的spark。我使用./bin/spark-shell打开spark并使用scala。现在我对使用scala的键值格式感到困惑。如何在spark中使用scala生成键值格式

我在家里/锋/火花/数据的txt文件,它看起来像:

panda 0 
pink 3 
pirate 3 
panda 1 
pink 4 

我用sc.textFile得到这个txt文件。如果我做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7") 

那么我可以用rdd.collect(),以显示在屏幕上RDD:

scala> rdd.collect() 
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4) 

但是,如果我这样做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt") 

其中没有” .TXT “ 这里。然后当我使用rdd.collect(),我得到一个错误:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
...... 

但我看到了其他的例子。他们都在最后有“.txt”。我的代码或系统有问题吗?

另一件事是,当我试图做的事:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x)) 
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29 
scala> rddd.collect() 
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4)) 

我打算选择数据的第一列,并以此为重点。但rddd.collect()看起来不是那种方式,因为这个词出现了两次,这是不对的。我无法继续进行其他操作,如mapbykey,reducebykey或其他操作。我在哪里做错了?

任何帮助wille真的很感激。

+0

你的问题似乎与你使用“.txt”有点不一致。你可以检查你的文字 - 并插入你的代码 - 确保它是完全正确的。如果是这样,那么你的系统似乎真的搞砸了。 – Phasmid

回答

1

就比如我创建了一个String数据集,在这之后我用线分割的记录,并使用SparkContextparallelize方法来创建一个RDD。请注意,在创建RDD后,我使用其map方法拆分存储在每条记录中的String并将其转换为。

import org.apache.spark.sql.Row 
val text = "panda 0\npink 3\npirate 3\npanda 1\npink 4" 

val rdd = sc.parallelize(text.split("\n")).map(x => Row(x.split(" "):_*)) 
rdd.take(3) 

take方法的输出是:

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3]) 

关于你的第一个问题,就没有必要对文件有任何扩展。因为在这种情况下,文件被视为纯文本。

相关问题