2014-11-14 77 views
1

我从一个文件RDD[String]我应该如何将RDD [String]转换为RDD [(String,String)]?

val file = sc.textFile("/path/to/myData.txt") 

的myData的格式:

>str1_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
... 
FJDLALLLGL //the last line of str1 
>str2_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
... 
FJDLALLLGL //the last line of str2 
>str3_name 
... 

我应该怎么做才能从文件转换的数据结构RDD[(String, String)]? 例如,

trancRDD(
(str1_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL), 
(str2_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL), 
... 
) 
+0

我们已经使用自定义Hadoop的输入格式做了类似的事情,但它是不平凡的。如果我是你,我宁愿写一个小程序来将输入转换为适合Spark的格式。 – maasg

+0

由于您想要的转换取决于当前元素之前的元素(上一行以“>”开头),因此不可能在分区之间分配此分区(因为前一行> -line可能不在分区中)。正如@maasg所说,一些预处理将文件转换为正确的格式会更好。 –

+0

谢谢你们! @maasg Paul – fanhk

回答

1

如果有一个定义记录分隔符,如“>”上面所指出的,这可以使用自定义Hadoop配置来完成:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

val conf = new Configuration 
conf.set("textinputformat.record.delimiter", ">") 
// genome.txt contains the records provided in the question without the "..." 
val dataset = sc.newAPIHadoopFile("./data/genome.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
val data = dataset.map(x=>x._2.toString) 

让我们来看看数据

data.collect 
res11: Array[String] = 
Array("", "str1_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
FJDLALLLGL 
", "str2_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
FJDLALLLGL 
") 

我们可以很容易地记录了这个字符串的

val records = data.map{ multiLine => val lines = multiLine.split("\n"); (lines.head, lines.tail)} 
records.collect 
res14: Array[(String, Array[String])] = Array(("",Array()), 
     (str1_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL)), 
     (str2_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL))) 

(使用过滤器采取的第一个空的记录了...读者练习)

相关问题