2015-11-19 57 views
3

我试图通过酸洗它来序列化Spark RDD,并将pickled文件直接读入Python。酸洗Spark Spark RDD并将它读入Python

a = sc.parallelize(['1','2','3','4','5']) 
a.saveAsPickleFile('test_pkl') 

然后我将test_pkl文件复制到我的本地。我如何直接将它们读入Python?

pickle.load(open('part-00000','rb')) 

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib64/python2.6/pickle.py", line 1370, in load 
    return Unpickler(file).load() 
    File "/usr/lib64/python2.6/pickle.py", line 858, in load 
    dispatch[key](self) 
    File "/usr/lib64/python2.6/pickle.py", line 970, in load_string 
    raise ValueError, "insecure string pickle" 
ValueError: insecure string pickle 

我认为火花采用酸洗法比蟒蛇咸菜方法不同(正确的是:当我尝试正常咸菜包,当我试图读取“test_pkl”的第一个泡菜部分失败我如果我错了)。有什么办法让我从Spark中腌制数据,并从文件中直接将这个pickle对象读入python中?

+1

问题是,它不是一个咸菜文件,而是一个[SequenceFile(https://wiki.apache.org/hadoop/SequenceFile)含有腌对象,我不知道有任何积极发展解析器用于Python中的SequenceFiles。 – zero323

回答

1

一个更好的方法可能是酸洗数据在每个分区中,对其进行编码,并将其写入到一个文本文件:

import cPickle 
import base64 

def partition_to_encoded_pickle_object(partition): 
    p = [i for i in partition] # convert the RDD partition to a list 
    p = cPickle.dumps(p, protocol=2) # pickle the list 
    return [base64.b64encode(p)] # base64 encode the list, and return it in an iterable 

my_rdd.mapPartitions(partition_to_encoded_pickle_object).saveAsTextFile("your/hdfs/path/") 

你的文件(S)下载到本地目录后,就可以使用下面的代码段来读取它:

# you first need to download the file, this step is not shown 
# afterwards, you can use 
path = "your/local/path/to/downloaded/files/" 
data = [] 
for part in os.listdir(path): 
    if part[0] != "_": # this prevents system generated files from getting read - e.g. "_SUCCESS" 
     data += cPickle.loads(base64.b64decode((open(part,'rb').read()))) 
+0

这里唯一的问题是加载部分需要将所有数据加载到'data'内存中,而这可能并不总是可能的。 – Tgsmith61591

+0

@ Tgsmith61591正确 - 如果您在单台计算机上读取数据,则通常无法读取群集中的所有数据。要解决这个问题,您可能需要从文件中过滤/缩小/提取所需的数据,例如'data + = some_filter_fx(cPickle.loads(base64.b64decode((open(part,'rb')。read()))))'' – mgoldwasser

1

问题是格式不是一个pickle文件。它是一个SequenceFile的酸渍objectssequence file可以在Hadoop和Spark环境中打开,但不打算在python中使用,并使用基于JVM的序列化进行序列化,在这种情况下是字符串列表。

1

可以使用sparkpickle项目。就这么简单

with open("/path/to/file", "rb") as f: 
    print(sparkpickle.load(f))