-1
我有三个列(串)CSV文件,我聚类对聚类此代码对齐柏林k均值使用Apache火花齐柏林
这是代码:
case class kmeansScore(k: String, score: String,j: String )
val rawData = sc.textFile("/resources/data/v1.csv")
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
import org.apache.spark.mllib.linalg._
val labelsAndData = rawData.zipWithIndex.flatMap {
case (line,index) =>
if (index == 0) {
None
} else {
val buffer = line.split(',').toBuffer
buffer.remove(1, 4)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
Some((label,vector))
}
}
import org.apache.spark.mllib.clustering._
def distance(a: Vector, b: Vector) = math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum)
def distToCentroid(datum: Vector, model: KMeansModel) = {
val cluster = model.predict(datum)
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}
import org.apache.spark.rdd._
val dataAsArray = labelsAndData.values.map(_.toArray).cache()
val dataAsArray.first().length
但我得到这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 138, localhost): java.lang.IndexOutOfBoundsException: 1
at scala.collection.mutable.ArrayBuffer.remove(ArrayBuffer.scala:158)
什么问题?我正在研究Zeppelin https://my.datascientistworkbench.com/tools/zeppelin-notebook/
你提供的链接需要登录 –
@MattiLyra是的我有一个账户,有什么意义? –
该链接对那里没有账户的人有什么用处?我们应该看看笔记本吗? –