2015-11-12 81 views
3

使用PySpark的saveAsHadoopFile()时出现错误,使用saveAsSequenceFile()时出现同样的错误。我需要保存(key,val)的RDD,其中键是字符串,val是LabeledPoint RDD(标签,SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在IPython笔记本内做到这一点。我需要序列化这个大的RDD,所以我可以用Java处理它,因为Spark的MLLib功能中有一些还不适用于python。根据这post这应该是可行的。如何序列化PySpark中的LabeledPoint RDD?

望着这page我看到:

_picklable_classes = [ 
    'LinkedList', 
    'SparseVector', 
    'DenseVector', 
    'DenseMatrix', 
    'Rating', 
    'LabeledPoint', 
] 

所以我真的不知道为什么我得到这个错误。

Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')

Error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

编辑:我发现这一点:

public class More ...ClassDictConstructor implements IObjectConstructor  { 
12 
13 String module; 
14 String name; 
15 
16 public More ...ClassDictConstructor(String module, String name) { 
17  this.module = module; 
18  this.name = name; 
19 } 
20 
21 public Object More ...construct(Object[] args) { 
22  if (args.length > 0) 
23   throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")"); 
24  return new ClassDict(module, name); 
25 } 
26} 

我不使用的构造()方法上面直接..所以我不知道为什么另存为...的方法,我试图通它什么时候不需要它的参数。

编辑2:下面的zero323建议(谢谢)与小毛刺工作。当我尝试使用zero323写入的内容时,出现错误(请参阅下文)。但是,当我推导出更简单的RDD时,它可以工作,并将这个简单的RDD保存到.parquet文件的目录(将其分解为多个.parquet文件)。比较简单的RDD如下:试图

simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features)) 
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file") 

错误时保存labeledDataRDD:

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row) 
    831   raise TypeError("Can not infer schema for type: %s" % type(row)) 
    832 
--> 833  fields = [StructField(k, _infer_type(v), True) for k, v in items] 
    834  return StructType(fields) 
    835 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj) 
    808    return _infer_schema(obj) 
    809   except TypeError: 
--> 810    raise TypeError("not supported type: %s" % type(obj)) 
    811 
    812 

TypeError: not supported type: <type 'numpy.unicode_'> 
+0

python还没有提供什么功能呢? –

+0

看看[这里](https://spark.apache.org/docs/1.5.1/mllib-dimensionality-reduction.html#svd-example)。我想他们会在Spark 1.6中加入这个。我使用的是最近的Spark 1.5.1。 – Kai

回答

1

问题的根源不酸洗本身。如果是这样,你不会看到net.razorvine.pickle.PickleException。如果您在saveAsSequenceFile文档看看你会看到,它需要两个步骤:

  1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
  2. Keys and values of this Java RDD are converted to Writables and written out.

你在程序的第一步失败,但即使没有,我不完全相信会是什么预期Java对象以及如何读取它。

,而不是与序列文件打我就简单的写数据作为平面文件:

from pyspark.mllib.regression import LabeledPoint 

rdd = sc.parallelize([ 
    ("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])), 
    ("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))]) 

sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file") 

读出来并转换:

import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 

val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file") 
    .select($"k", $"v.label", $"v.features") 
    .map{case Row(k: String, label: Double, features: Vector) => 
    (k, LabeledPoint(label, features))} 

rdd.sortBy(_._1, false).take(2) 

// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] = 
// Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0]))) 

或者如果你喜欢更多的Java类方法:

def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = { 
    // Vector -> org.apache.spark.mllib.linalg.Vector 
    Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2))) 
} 

sqlContext.read.parquet("a_parquet_file") 
    .select($"k", $"v.label", $"v.features") 
    .map(rowToKeyLabeledPointPair) 

编辑

一般而言,NumPy类型不作为Spark SQL中的独立值支持。如果你在RDD中有Numpy类型,你可以先将它们转换成标准的Python类型:

tmp = rdd.map(lambda kv: (str(kv[0]), kv[1])) 
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file") 
+0

你给Scala代码,我努力在Java中复制......我试图从实木复合地址文件中读取java端的数据框。 'DataFrame df = sqlContext.read()。parquet(“labeledData_parquet_file”); JavaRDD rows = df.toJavaRDD()。map(** what here here?**);'。矢量是一个mllib矢量。问题是dataFrame包含一个sql Row NOT mllib Vector,并且我需要JavaRDD(双标签,SparseVector)。 – Kai

+0

行提供了一大组类型的getter方法。你可以使用'getDouble','getString'作为key和label,'getAs'作为vector。 – zero323