2017-10-18 126 views
1

我行和列的矩阵和数量是不明如何使用Scala/spark将矩阵转换为DataFrame?

一个例子Matrix是:

[5,1.3] 
[1,5.2] 

我想将它转换成数据帧,列名是随机的,怎么才达到的呢? 这是我期待的结果:

+-------------+----+ 
    |   _1 | _2 | 
    +-------------+----+ 
    |5   |1.3 | 
    |1   |5.2 | 
    -------------------- 

回答

1

我建议你转换矩阵RDD,然后再转换到RDD数据帧,这不是一个好办法,但在星火2.0.0正常工作。

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{Row, SparkSession} 
import org.apache.spark.mllib.linalg._ 
import org.apache.spark.rdd.RDD 
object mat2df { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("mat2df").setMaster("local[1]") 
     val sc = new SparkContext(conf) 
     val values = Array(5, 1, 1.3, 5.2) 
     val mat = Matrices.dense(2, 2, values).asInstanceOf[DenseMatrix] 
     def toRDD(m: Matrix): RDD[Vector] = { 
      val columns = m.toArray.grouped(m.numRows) 
      val rows = columns.toSeq.transpose 
      val vectors = rows.map(row => new DenseVector(row.toArray)) 
      sc.parallelize(vectors) 
     } 
     val mat_rows = toRDD(mat)// matrix to rdd 
     val mat_rdd = mat_rows.map(_.toArray).map{case Array(p0, p1) => (p0, p1)} 
     val spark: SparkSession = SparkSession.builder.master("local").getOrCreate 
     val df = spark.createDataFrame(mat_rdd) // rdd to dataframe 
     df.show() 
    } 
} 
1
def matrixToDataFrame(sc:SparkContext, matrix:Matrix, m_nodeColName:String):DataFrame={ 
val rdd = sc.parallelize(matrix.colIter.toSeq).map(x => { 
     Row.fromSeq(x.toArray.toSeq) 
    }) 
    val sc = new SQLContext(nodeContext.getSparkCtx()) 
    var schema = new StructType() 

    val ids = ArrayBuffer[String]() 
    for (i <- 0 until matrix.rowIter.size) { 
     schema = schema.add(StructField(m_nodeColName +"_"+ i.toString(), DoubleType, true)) 
     ids.append(m_nodeColName +"_"+ i.toString()) 
    } 

    sc.sparkSession.createDataFrame(rdd, schema) 
}