2016-02-29 59 views
0

在Spark中,我有一个RDD,其中包含数百万到本地文件的路径(我们有一个共享的文件系统,因此它们在本地显示)。在Scala中,我将如何创建一个由每个文件中所有行组成的RDD?在Spark中读取数百万本地文件

我试图做这样的事情:

paths.flatMap(path => sc.textFile(path)) 

但没有奏效。我也试过这样的:

paths.flatMap(path => 
    scala.io.Source.fromInputStream(new java.io.FileInputStream(path)).getLines 
) 

这工作时本地工作,但没有在多台机器上运行时。我结束了这个错误:

java.nio.charset.MalformedInputException: Input length = 1 
    at java.nio.charset.CoderResult.throwException(CoderResult.java:277) 

任何指针将不胜感激

(大多数解决方案点至今涉及通过文件的列表sc.textFile全部一次,这是不可能的,因为名单可能非常大,现在一个典型的用例会产生20M的路径,这不适合单个Java String)。

回答

2

如果他们在一个目录,那么它可能会更好看整个目录

sc.textFile("/path/to/directory/") 

将合并所有文件到一个单一的RDD,寻找出MEM约束。或者你可以尝试的地图,则减少:

paths.map(path => sc.textFile(path)).reduce(_.union(_)) 

或者甚至zero323建议更好:

SparkContext.union(paths.map(...)) 
+0

@ zero323哦,我不知道,很高兴知道,我认为最大字符串长度是65535个字符,但是2^31-1大约是2个字符,假设每个路径是20个字符,它应该是足够的。 – GameOfThrows

+0

谢谢:)我可以用'SparkContext.union(paths.map(...))'来替换'.reduce(_。union(_))'吗?这真的会产生__huge差异_。 (如果你这样做,请从维基答案中删除最后一段) – zero323

+0

@ zero323哇,我没想到这会更快,但它确实有很大的不同,你介意简单解释一下为什么?这与数据混洗有关吗? – GameOfThrows

2

此:

paths.flatMap(path => sc.textFile(path)) 

根本无法编译何况工作,因为RDDS不TraversableOnce

直接读取文件时看到的错误(java.nio.charset.MalformedInputException)与Spark不相关,并在文件编码不正确时引发。引述MalformedInputException documentation

Checked exception thrown when an input byte sequence is not legal for given charset, or an input character sequence is not a legal sixteen-bit Unicode sequence. You can solve it providing a codec for fromInputStream method:

def fromInputStream(is: InputStream)(implicit codec: Codec) 

并使用Codec.onMalformedInput具有所需CodingErrorAction

参见例如Source.fromInputStream exception handling during reading lines

此外,当您直接读取数据时,例如通过用Try包装读取块,您应该处理IO异常。

Spark本身支持读取完整的目录树。没有理由传递个别文件,只有顶级目录列表并使用mapreduce.input.fileinputformat.input.dir.recursive配置的正确设置。也可以通过多根目录作为一个逗号分隔的字符串:

sc.textFile("/foo,/bar") 

您也可以使用通配符来读取文件和目录的任意列表:

sc.textFile(Seq(
    "/foo/bar/*", 
    "/bar/*/*.txt" 
).mkString(",")) 

最后阅读大量的小由于计算输入分割的方式,文件效率低下。而不是使用textFiles,你应该考虑与CombineFileInputFormat例如一个子类阅读:

sc.hadoopFile(
    paths, 
    classOf[CombineTextInputFormat], 
    classOf[LongWritable], classOf[Text] 
).map(_._2.toString) 

最后,你可以通过@GameOfThrows建议,但它不应该在没有检查点to avoid issues with long lineages反复做union多个RDDS。改为使用SparkContext.union并控制分区数量。

相关问题