1

我需要一个RDD转换为单个柱o.a.s.ml.linalg.Vector数据帧中,为了使用ML算法,特别KMEANS对于这种情况。这是我的RDD:(阵列/ ML矢量/ MLlib矢量)RDD到ML矢量数据帧coulmn

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble)))) 

我试图做什么this答案没有运气所暗示的,我想是因为你结束了一个mllib载体,其上运行的算法时抛出一个不匹配错误。现在,如果我改变了:

import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 

val schema = new StructType() 
    .add("features", new VectorUDT()) 

这样:

import org.apache.spark.ml.linalg.{Vectors, VectorUDT} 

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble)))) 

val schema = new StructType() 
    .add("features", new VectorUDT()) 

因为ML VectorUDT是私人的,我会得到一个错误。

我ALGO试图转换RDD为双打,以数据帧的数组,并获得ML密集向量是这样的:

var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble))) 

parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 

val schema2 = new StructType().add("features", ArrayType(DoubleType)) 

schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true)) 

val df = spark.createDataFrame(parsedData, schema2) 

df: org.apache.spark.sql.DataFrame = [features: array<double>] 

val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) } 

会抛出下面的错误,即使spark.implicits._输入:

error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 

任何帮助,非常感谢,谢谢!

回答

1

从我头顶的:

  1. 使用csv源和VectorAssembler

    ​​
  2. 使用text源和UDF:

    def parse_(n: Int, m: Int)(s: String) = Try(
        Vectors.dense(s.split(',').slice(n, m).map(_.toDouble)) 
    ).toOption 
    
    def parse(n: Int, m: Int) = udf(parse_(n, m) _) 
    
    val raw = spark.read.text(path) 
    
    raw.select(parse(n, m)(col(raw.columns.head)).alias("features")) 
    
  3. 使用text源并放下包装

    spark.read.text(path).as[String].map(parse_(n, m)).toDF 
    
+0

哇感谢您的回答,我给他们一个尝试。任何想法,如果1)将是最有效的?其实我是做这样的事情,但缺少切片输入的cols列的方式,因为他们是64个。另外我会很感激,如果你可以,如果你能解释一下这部分1):exprs:_ *。是否像选择所有列?非常感谢! – Pilailou

+0

2和3可稍快,因为没有涉及到CSV解析,但我不会注重这一点。 1.可以通过向读者提供架构来改进。最后'_ *'是关于[varargs](http://stackoverflow.com/q/1008783/1560062)。它接受一个序列并将其“解压”为select的参数。 – zero323

+0

好我会再看看,再次感谢! – Pilailou