2016-04-25 47 views
0

我需要从HDFS很多的gzip的阅读,是这样的:(“* GZ”) sc.textFile 而其中一些的gzip的损坏,提高如何跳过损坏的gzips与pyspark?

产生java.io.IOException: gzip流CRC失败

停止整个处理运行。

我读了辩论here,其中有人有相同的需求,但没有得到明确的解决方案。由于在火花内实现此功能是不合适的(根据链接),有什么方法可以粗暴地跳过损坏的文件吗?似乎有scala用户的提示,不知道如何在python中处理它。

或者我只能检测到损坏的文件,并删除它们?

如果我有大量的gzip,并且在跑完一天之后发现最后一个已损坏,该怎么办。整天浪费了。并且损坏的gzip很常见。

回答

0

您可以手动列出所有文件,然后通过地图UDF中的文件进行读取。然后,UDF可以尝试/除了块来处理损坏的文件。

的代码会看起来像

import gzip 
from pyspark.sql import Row 

def readGzips(fileLoc): 
    try: 
     ... 
     code to read file 
     ... 
     return record 
    except: 
     return Row(failed=fileLoc) 

from os import listdir 
from os.path import isfile, join 
fileList = [f for f in listdir(mypath) if isfile(join(mypath, f))] 

pFileList = sc.parallelize(fileList) 
dataRdd = pFileList.map(readGzips).filter((lambda x: 'failed' not in x.asDict())) 
+0

所以“代码读取文件”的部分是使用Python的gzip的模块,以解压缩文件?输入是路径字符串的列表,我的文件在hdfs上,我如何使用原始python来读取它们? – zhangcx93

+0

你问如何从hdfs读取文件?或者如何读取gzip? http://stackoverflow.com/questions/10566558/python-read-lines-from-compressed-text-files描述如何使用gzip。而hdfs应该可以通过“hdfs://path/to/file.gz”获得, – David

+0

我的意思是在readGzips()中,只能使用python从hdfs访问文件,外部包,因为open('hdfs://file')会引发IOError。另外,以这种方式,mesos和spark不能正确指定工作以使机器具有文件来处理数据,因此比使用sc.textFile()正常读取文件的方式慢。 – zhangcx93