你面对可以分为以下几个问题:
- 转换你的收视率(我相信)为
LabeledPoint
数据X。
- 将X保存为libsvm格式。
1.您转换成收视数据LabeledPoint
X
让我们考虑以下原料等级:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
您可以处理那些原评级为coordinate list matrix (COO)。
Spark实现了由其条目RDD支持的分布式矩阵:CoordinateMatrix
其中每个条目是(i:Long,j:Long,值:Double)的元组。
注意:只有当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix。(这通常是用户/项目的评分的情况下。)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
现在让我们转换该RDD[MatrixEntry]
到CoordinateMatrix
并提取索引行:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2.保存LabeledPoint数据在libsvm格式
由于Spark 2.0,您可以使用DataFrameWriter
来做到这一点。让我们来创建一些虚拟LabeledPoint数据一个小例子(你也可以用我们前面创建的DataFrame
):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
不幸的是,我们仍然无法使用DataFrameWriter
直接,因为虽然大多数管道组件支持加载向后兼容, 2.0版本之前的Spark版本中的一些现有DataFrames和管道(包含矢量或矩阵列)可能需要迁移到新的spark.ml矢量和矩阵类型。
实用程序,用于将数据帧列从mllib.linalg
到ml.linalg
类型(反之亦然)可以在org.apache.spark.mllib.util.MLUtils.
在我们的案例中找到我们需要做以下(对于虚拟数据和DataFrame
从step 1.
)
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
现在,让我们保存数据框:
convertedVecDF.write.format("libsvm").save("data/foo")
我们可以检查文件内容:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
编辑: 在火花的当前版本(2.1.0),就没有必要使用mllib
包。你可以简单地在LIBSVM保存LabeledPoint
数据,如下面格式:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
这火花的版本,您使用的? – eliasah
使用版本2.0! –