2015-10-05 161 views
0

如何将csv转换为Rdd [双]?我有错误:无法在该行被应用到(org.apache.spark.rdd.RDD [单位]):将Rdd [矢量]转换为Rdd [双]

val kd = new KernelDensity().setSample(rows) 

我完整的代码是在这里:

import org.apache.spark.mllib.linalg.Vectors 
    import org.apache.spark.mllib.linalg.distributed.RowMatrix 
    import org.apache.spark.mllib.stat.KernelDensity 
    import org.apache.spark.rdd.RDD 
    import org.apache.spark.{SparkContext, SparkConf} 

class KdeAnalysis { 
    val conf = new SparkConf().setAppName("sample").setMaster("local") 
    val sc = new SparkContext(conf) 

    val DATAFILE: String = "C:\\Users\\ajohn\\Desktop\\spark_R\\data\\mass_cytometry\\mass.csv" 
    val rows = sc.textFile(DATAFILE).map { 
    line => val values = line.split(',').map(_.toDouble) 
     Vectors.dense(values) 
    }.cache() 



    // Construct the density estimator with the sample data and a standard deviation for the Gaussian 
    // kernels 
    val rdd : RDD[Double] = sc.parallelize(rows) 
    val kd = new KernelDensity().setSample(rdd) 
    .setBandwidth(3.0) 

    // Find density estimates for the given values 
    val densities = kd.estimate(Array(-1.0, 2.0, 5.0)) 
} 
+0

我没有看到你在那里你可以得到'RDD [单位]任何地方'。 – zero323

回答

2

由于rows是一个RDD[org.apache.spark.mllib.linalg.Vector]以下线路不能正常工作:

val rdd : RDD[Double] = sc.parallelize(rows) 

parallelize预计Seq[T]RDD不是Seq

即使这部分工作正如你所期望的那样,你的输入是完全错误的。 KernelDensity.setSample的正确参数是RDD[Double]JavaRDD[java.lang.Double]。看起来它现在不支持多元数据。

关于从瓦的问题,你可以flatMap

rows.flatMap(_.toArray) 

,甚至更好,当你创建rows

val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache() 

,但我怀疑这真的是你所需要的。

0

准备了此代码,请评价,如果它可以帮你 - >

val doubleRDD = rows.map(_.toArray).flatMap(x => x)