2017-05-18 169 views
1

我有一个火花数据框df下面的模式:星火据帧到数据帧[矢量]

root 
|-- features: array (nullable = true) 
| |-- element: double (containsNull = false) 

我想创建一个新的数据帧,每一行会的Double个向量并期望得到以下模式:

root 
    |-- features: vector (nullable = true) 

到目前为止,我有以下的代码(由这篇文章的影响:Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala),但我担心的东西是错误与我因为计算合理数量的行需要很长时间。另外,如果行数太多,应用程序将崩溃并产生堆空间异常。

val clustSet = df.rdd.map(r => { 
      val arr = r.getAs[mutable.WrappedArray[Double]]("features") 
      val features: Vector = Vectors.dense(arr.toArray) 
      features 
      }).map(Tuple1(_)).toDF() 

我怀疑在这种情况下,指令arr.toArray不是一个好的Spark练习。任何澄清都会非常有帮助。

谢谢!

回答

4

这是因为.rdd有反序列化从内部内存格式的对象,这是非常耗时。

这是确定使用.toArray - 正在以行级,不收集一切驾驶员节点。

你可以做到这一点很容易的UDF:

import org.apache.spark.ml.linalg._ 
val convertUDF = udf((array : Seq[Double]) => { 
    Vectors.dense(array.toArray) 
}) 
val withVector = dataset 
    .withColumn("features", convertUDF('features)) 

代码是从这样的回答:Convert ArrayType(FloatType,false) to VectorUTD

然而,有问题的作者并没有问差异

+0

非常感谢你,这对我有很大帮助,并将其标记为答案。我现在可以运行更多的行,并且它在时间上是令人满意的。我仍然得到一个异常:__org.apache.spark.SparkException:Kryo序列化失败:缓冲区溢出。可用:0,必需:1__当我尝试200,000行时。你会对此有所了解吗?再次感谢。 – user159941

+0

@ user159941请检查http://stackoverflow.com/questions/31947335/how-kryo-serializer-allocates-buffer-in-spark –

+1

我在我的代码设置如下:** VAL的conf =新SparkConf() 。设置(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”) .set(“spark.kryoserializer.buffer.max.mb”,“256”)**,它的工作!谢谢。 – user159941