2

我用下面的代码创建集群化模型,然后进行分类每个记录某些集群:pyspark:追加/合并PythonRDD到pyspark数据帧

from pyspark.mllib.clustering import KMeans 
from pyspark.mllib.linalg import Vectors 

spark_df = sqlContext.createDataFrame(pandas_df) 
rdd = spark_df.rdd.map(lambda data: Vectors.dense([float(c) for c in data])) 
model = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") 

result = model.predict(red) 

如何追加预测结果回spark_df作为附加列吗?谢谢!

+0

为什么不首先使用'ml'? – zero323

+0

我无法将spark_df(数据框)转换为spark数据集。如果我只是使用ml的数据帧,它将无法工作。有关如何将数据框转换为数据集的建议?谢谢! – Edamame

+0

如果我使用ml:model = kmeans.fit(spark_df),则出现错误:AnalysisException:u“无法解析给定输入列的'features':[field_1,field_2,... field10];”所以在我看来,我不能直接使用spark_df – Edamame

回答

1

pyspark.mllib.clustering.KMeansModel是可以直接使用PySpark转型中罕见的车型之一,所以你可以简单地mappredict

rdd.map(lambda point: (model.predict(point), point)) 

在一般情况下是不可能zip是这个职位的合适的工具:

rdd.zip(model.predict(rdd)) 
+0

zip给出了一个元组与行和预测值..我们如何将它作为列“new_col”添加到数据框本身,以便我们可以使用saveastable将其写回数据库()。 – venkat

+0

@venkat这是'mllib'而不是'ml'。使用'ml'模型只需使用'transform'方法。 – zero323

+0

我在谈论mllib不是ml,我如何获得数据框与额外的预测列并回写到分贝? – venkat