10

我想制作libsvm格式,所以我将dataframe制作成所需的格式,但我不知道如何转换为libsvm格式。格式如图所示。我希望所需的libsvm类型是用户项目:评分。如果您知道在当前形势下做什么:如何从DataFrame将数据准备到LibSVM格式?

val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line => 
    val fields = line.split(",") 
     (fields(0).toInt,fields(1).toInt,fields(2).toDouble) 
} 
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))} 
val usergroup = user.groupByKey 

val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)} 

val data_DF = data.toDF("user","item","rating") 

DATAFRAME FIGURE

+0

这火花的版本,您使用的? – eliasah

+0

使用版本2.0! –

回答

11

你面对可以分为以下几个问题:

  • 转换你的收视率(我相信)为LabeledPoint数据X
  • 将X保存为libsvm格式。

1.您转换成收视数据LabeledPointX

让我们考虑以下原料等级:

val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5") 

您可以处理那些原评级为coordinate list matrix (COO)

Spark实现了由其条目RDD支持的分布式矩阵:CoordinateMatrix其中每个条目是(i:Long,j:Long,值:Double)的元组。

注意:只有当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix。(这通常是用户/项目的评分的情况下。)

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 
import org.apache.spark.rdd.RDD 

val data: RDD[MatrixEntry] = 
     sc.parallelize(rawRatings).map { 
      line => { 
        val fields = line.split(",") 
        val i = fields(0).toLong 
        val j = fields(1).toLong 
        val value = fields(2).toDouble 
        MatrixEntry(i, j, value) 
      } 
     } 

现在让我们转换该RDD[MatrixEntry]CoordinateMatrix并提取索引行:

val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix 
       .toIndexedRowMatrix().rows // Extract indexed rows 
       .toDF("label", "features") // Convert rows 

2.保存LabeledPoint数据在libsvm格式

由于Spark 2.0,您可以使用DataFrameWriter来做到这一点。让我们来创建一些虚拟LabeledPoint数据一个小例子(你也可以用我们前面创建的DataFrame):

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.mllib.regression.LabeledPoint 
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) 
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) 

val df = Seq(neg,pos).toDF("label","features") 

不幸的是,我们仍然无法使用DataFrameWriter直接,因为虽然大多数管道组件支持加载向后兼容, 2.0版本之前的Spark版本中的一些现有DataFrames和管道(包含矢量或矩阵列)可能需要迁移到新的spark.ml矢量和矩阵类型。

实用程序,用于将数据帧列从mllib.linalgml.linalg类型(反之亦然)可以在org.apache.spark.mllib.util.MLUtils.在我们的案例中找到我们需要做以下(对于虚拟数据和DataFramestep 1.

import org.apache.spark.mllib.util.MLUtils 
// convert DataFrame columns 
val convertedVecDF = MLUtils.convertVectorColumnsToML(df) 

现在,让我们保存数据框:

convertedVecDF.write.format("libsvm").save("data/foo") 

我们可以检查文件内容:

$ cat data/foo/part* 
0.0 1:1.0 3:3.0 
1.0 1:1.0 2:0.0 3:3.0 

编辑: 在火花的当前版本(2.1.0),就没有必要使用mllib包。你可以简单地在LIBSVM保存LabeledPoint数据,如下面格式:

import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.feature.LabeledPoint 
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) 
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) 

val df = Seq(neg,pos).toDF("label","features") 
df.write.format("libsvm").save("data/foo") 
+0

还有一个问题!如何将稀疏形式的数据框放在标签点上?最后,它应该以用户项目的形式:评级。 –

+0

好的,谢谢。对不起,我的不干净的代码:( –

+1

哦..谢谢!我期待你的建议。谢谢你的回复。 –