2017-05-21 125 views
0

我想用两个RDD的列创建一个Dataframe。 第一个是我从CSV获得的RDD,第二个是另一个RDD,每行都有一个集群预测。将RDD添加到DataFrame列PySpark

我的模式是:

customSchema = StructType([ \ 
StructField("Area", FloatType(), True), \ 
StructField("Perimeter", FloatType(), True), \ 
StructField("Compactness", FloatType(), True), \ 
StructField("Lenght", FloatType(), True), \ 
StructField("Width", FloatType(), True), \ 
StructField("Asymmetry", FloatType(), True), \ 
StructField("KernelGroove", FloatType(), True)]) 

地图我RDD和创建数据框:

FN2 = rdd.map(lambda x: (float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6]))) 
df = sqlContext.createDataFrame(FN2, customSchema) 

而且我的群集预测:

result = Kmodel.predict(rdd) 

所以,最后我想有在我的DataFrame中我的CSV行和他们的集群预测在最后。

我试图添加一个新的列与.WithColumn(),但我什么都没有。

谢谢。

回答

0

如果你有两个数据帧在一个共同的领域,然后用键连接,否则创建一个唯一的ID,并加入两个数据框获得CSV及其集群预测的排在一个单一的数据帧

Scala代码生成每行的唯一ID,尝试转换为pyspark。你需要生成一个提高排ID和行ID加入他们

import org.apache.spark.sql.types.{StructType, StructField, LongType} 
val df = sc.parallelize(Seq(("abc", 2), ("def", 1), ("hij", 3))).toDF("word", "count") 
val wcschema = df.schema 
val inputRows = df.rdd.zipWithUniqueId.map{ 
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 
val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields)) 

或使用SQL查询

val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount") 
tmpTable1.show()