2017-02-20 126 views
1

我对scala和spark 2.1很陌生。 我试图计算一个数据帧,它看起来像这些元素之间的相关性:将Spark数据帧转换为org.apache.spark.rdd.RDD [org.apache.spark.mllib.linalg.Vector]

item_1 | item_2 | item_3 | item_4 
    1 |  1 |  4 |  3 
    2 |  0 |  2 |  0 
    0 |  2 |  0 |  1 

这里是我试过:元素之间

val df = sqlContext.createDataFrame(
    Seq((1, 1, 4, 3), 
     (2, 0, 2, 0), 
     (0, 2, 0, 1) 
).toDF("item_1", "item_2", "item_3", "item_4") 


val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0)) 

而且calcualte相关:

val correlMatrix: Matrix = Statistics.corr(items, "pearson") 

随着followning错误消息:

<console>:89: error: type mismatch; 
found : org.apache.spark.rdd.RDD[Seq[Double]] 
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] 
     val correlMatrix: Matrix = Statistics.corr(items, "pearson") 

我不知道如何从数据框中创建org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

这可能是一个非常简单的任务,但我有点挣扎,我很乐意提供任何建议。

回答

5

例如,您可以使用VectorAssembler。装配载体和转换为RDD

import org.apache.spark.ml.feature.VectorAssembler 

val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs") 
    .transform(df) 
    .select("vs") 
    .rdd 

提取Vectors

  • 星火1.x中:

    rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0)) 
    
  • 星火2.X:

    rows 
        .map(_.getAs[org.apache.spark.ml.linalg.Vector](0)) 
        .map(org.apache.spark.mllib.linalg.Vectors.fromML) 
    

关于你的代码:

  • Integer列不Double
  • 数据不是array所以你不能使用_.getSeq[Double](0)
+0

非常感谢你 - 这就是我一直在寻找的解决方案 – Duesentrieb

2

如果您的目标是执行皮尔森相关性,则不必真正使用RDD和向量。以下是直接在DataFrame列上执行Pearson相关性的示例(所讨论的列是Doublebles类型)。

代码:

import org.apache.spark.sql.{SQLContext, Row, DataFrame} 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType} 
import org.apache.spark.sql.functions._ 


val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache() 

rb.agg(
    corr("overall","taste"), 
    corr("overall","aroma"), 
    corr("overall","palate"), 
    corr("overall","appearance"), 
    corr("overall","abv") 
    ).show() 

在本例中,我导入数据帧(具有自定义分隔符,无标头,和推断的数据类型),然后简单地对数据帧执行AGG功能其中有多个相关关系。



输出:

+--------------------+--------------------+---------------------+-------------------------+------------------+ 
|corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)| 
+--------------------+--------------------+---------------------+-------------------------+------------------+ 
| 0.8762432795943761| 0.789023067942876| 0.7008942639550395|  0.5663593891357243|0.3539158620897098| 
+--------------------+--------------------+---------------------+-------------------------+------------------+ 

你可以从结果中看到,(整体,味道)列是高度相关的,而(整体,ABV)没有这么多。

这是链接到Scala Docs DataFrame page which has the Aggregation Correlation Function

+0

谢谢你的这种方式。它做的工作,但我有超过300列来计算 – Duesentrieb

+0

有没有一种方法来计算许多列没有明确定义每个组合? – Duesentrieb