使用PySpark的saveAsHadoopFile()时出现错误,使用saveAsSequenceFile()时出现同样的错误。我需要保存(key,val)的RDD,其中键是字符串,val是LabeledPoint RDD(标签,SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在IPython笔记本内做到这一点。我需要序列化这个大的RDD,所以我可以用Java处理它,因为Spark的MLLib功能中有一些还不适用于python。根据这post这应该是可行的。如何序列化PySpark中的LabeledPoint RDD?
望着这page我看到:
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
所以我真的不知道为什么我得到这个错误。
Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')
Error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
编辑:我发现这一点:
public class More ...ClassDictConstructor implements IObjectConstructor {
12
13 String module;
14 String name;
15
16 public More ...ClassDictConstructor(String module, String name) {
17 this.module = module;
18 this.name = name;
19 }
20
21 public Object More ...construct(Object[] args) {
22 if (args.length > 0)
23 throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24 return new ClassDict(module, name);
25 }
26}
我不使用的构造()方法上面直接..所以我不知道为什么另存为...的方法,我试图通它什么时候不需要它的参数。
编辑2:下面的zero323建议(谢谢)与小毛刺工作。当我尝试使用zero323写入的内容时,出现错误(请参阅下文)。但是,当我推导出更简单的RDD时,它可以工作,并将这个简单的RDD保存到.parquet文件的目录(将其分解为多个.parquet文件)。比较简单的RDD如下:试图
simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")
错误时保存labeledDataRDD:
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
831 raise TypeError("Can not infer schema for type: %s" % type(row))
832
--> 833 fields = [StructField(k, _infer_type(v), True) for k, v in items]
834 return StructType(fields)
835
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
808 return _infer_schema(obj)
809 except TypeError:
--> 810 raise TypeError("not supported type: %s" % type(obj))
811
812
TypeError: not supported type: <type 'numpy.unicode_'>
python还没有提供什么功能呢? –
看看[这里](https://spark.apache.org/docs/1.5.1/mllib-dimensionality-reduction.html#svd-example)。我想他们会在Spark 1.6中加入这个。我使用的是最近的Spark 1.5.1。 – Kai