2017-03-02 56 views
3

我完全是Spark的新手,目前我正尝试使用Python编写一个简单的代码,用于对一组数据执行KMeans 。如何将类型<class'pyspark.sql.types.Row'>转换为Vector

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
import re 
from pyspark.mllib.clustering import KMeans, KMeansModel 
from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.linalg import SparseVector 
from numpy import array 
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.feature import MinMaxScaler 

import pandas as pd 
import numpy 
df = pd.read_csv("/<path>/Wholesale_customers_data.csv") 
sql_sc = SQLContext(sc) 
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"] 
s_df = sql_sc.createDataFrame(df) 
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature") 
vdf = vectorAss.transform(s_df) 
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||") 
model = kmeans.fit(vdf) 
cluster = model.clusterCenters() 
print(cluster) 

我输入到这些pyspark壳,并且当它运行模型= kmeans.fit(VDF),我得到以下错误:

TypeError: Cannot convert type into Vector

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 ERROR Executor: Exception in task 6.0 in stage 23.0 (TID 113) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py", line 77, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type into Vector The

数据我是从:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv

有人能告诉我这里出了什么问题,我错过了什么?我感谢任何帮助。

谢谢!

UPDATE: @Garren 我得到的错误是:

The errors I got is: >>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 4

Traceback (most recent call last): File "", line 1, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py", line 69, in fit return self._fit(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 133, in _fit java_model = self._fit_java(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 130, in _fit_java return self._java_obj.fit(dataset._jdf) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve 'features' given input columns: [Channel, Grocery, Fresh, Frozen, Detergents_Paper, Region, Delicassen, Milk];"

+0

你在哪一行出错? –

+0

嗨Vivek,行是:model = kmeans.fit(vdf) – hpnhxxwn

回答

1

使用Spark 2.x的ML封装仅在所述[即将弃用]火花mllib包:

from pyspark.ml.clustering import KMeans 
from pyspark.ml.feature import VectorAssembler 
df = spark.read.option("inferSchema", "true").option("header", "true").csv("whole_customers_data.csv") 
cols = df.columns 
vectorAss = VectorAssembler(inputCols=cols, outputCol="features") 
vdf = vectorAss.transform(df) 
kmeans = KMeans(k=2, maxIter=10, seed=1) 
kmm = kmeans.fit(vdf) 
kmm.clusterCenters() 
+0

嗨加伦,你可以分享你的代码?我跑了代码,并得到错误..感谢您的帮助! – hpnhxxwn

+0

@hpnhxxwn我将在我更新的答案中分享代码。也请张贴您的错误,以便其他人也可以向他们学习。 – Garren

+0

谢谢!这工作! – hpnhxxwn

相关问题