2016-04-11 36 views
4

我正在使用spark ML管道在真正宽的表格上设置分类模型。这意味着我必须自动生成所有处理列的代码,而不是精确地输入它们中的每一个。我几乎是一个初学者在Scala和火花。当我尝试做如下操作时,我被卡在VectorAssembler()部分:Spark ML VectorAssembler()处理数据帧中的数千列

val featureHeaders = featureHeader.collect.mkString(" ") 
//convert the header RDD into a string 
val featureArray = featureHeaders.split(",").toArray 
val quote = "\"" 
val featureSIArray = featureArray.map(x => (s"$quote$x$quote")) 
//count the element in headers 
val featureHeader_cnt = featureHeaders.split(",").toList.length 


// Fit on whole dataset to include all labels in index. 
import org.apache.spark.ml.feature.StringIndexer 
val labelIndexer = new StringIndexer(). 
    setInputCol("target"). 
    setOutputCol("indexedLabel") 

val featureAssembler = new VectorAssembler(). 
    setInputCols(featureSIArray). 
    setOutputCol("features") 

val convpipeline = new Pipeline(). 
    setStages(Array(labelIndexer, featureAssembler)) 

val myFeatureTransfer = convpipeline.fit(df) 

显然它没有工作。我不确定我该怎么做才能让整个事情变得更加自动化,或者ML管道在这一刻不会占用那么多列(我怀疑)?

+0

这仍然不适用于我。我认为我的输入数据框很好。我可以很容易地创建一个标记点​​,将其输入到MLlib中,但不能用于ML管道。请指教,谢谢! –

回答

0

除非列名包含引号,否则不应使用引号(s"$quote$x$quote")。尝试

val featureAssembler = new VectorAssembler(). 
    setInputCols(featureArray). 
    setOutputCol("features") 
+0

谢谢,但使用featureArray也无法正常工作。它给出了这个错误: WARN TaskSetManager:在阶段28.0中丢失的任务0.2:java.lang.ArrayIndexOutOfBoundsException 错误TaskSetManager:阶段28.0中的任务0失败4次;中止作业 org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段28.0中的任务0失败4次,最近一次失败:java.lang.ArrayIndexOutOfBoundsException –

+0

它与此无关。看起来你有不正确的数据。 –

+0

感谢您的帮助,我将重新审视我的输入数据。 =) –

0

我终于想出了一种方法,这不是很漂亮。它是为特征创建vector.dense,然后创建数据框架。

import org.apache.spark.mllib.regression.LabeledPoint 
val myDataRDDLP = inputData.map {line => 
val indexed = line.split('\t').zipWithIndex 
val myValues = indexed.filter(x=> {x._2 >1770}).map(x=>x._1).map(_.toDouble) 
val mykey = indexed.filter(x=> {x._2 == 3}).map(x=>(x._1.toDouble-1)).mkString.toDouble 
LabeledPoint(mykey, Vectors.dense(myValues)) 
} 
val training = sqlContext.createDataFrame(myDataRDDLP).toDF("label", "features")