我完全是Spark的新手,目前我正尝试使用Python编写一个简单的代码,用于对一组数据执行KMeans 。如何将类型<class'pyspark.sql.types.Row'>转换为Vector
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import re
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg import SparseVector
from numpy import array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import pandas as pd
import numpy
df = pd.read_csv("/<path>/Wholesale_customers_data.csv")
sql_sc = SQLContext(sc)
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"]
s_df = sql_sc.createDataFrame(df)
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature")
vdf = vectorAss.transform(s_df)
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||")
model = kmeans.fit(vdf)
cluster = model.clusterCenters()
print(cluster)
我输入到这些pyspark壳,并且当它运行模型= kmeans.fit(VDF),我得到以下错误:
TypeError: Cannot convert type into Vector
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 ERROR Executor: Exception in task 6.0 in stage 23.0 (TID 113) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py", line 77, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type into Vector The
数据我是从:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv
有人能告诉我这里出了什么问题,我错过了什么?我感谢任何帮助。
谢谢!
UPDATE: @Garren 我得到的错误是:
The errors I got is: >>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 4
Traceback (most recent call last): File "", line 1, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py", line 69, in fit return self._fit(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 133, in _fit java_model = self._fit_java(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 130, in _fit_java return self._java_obj.fit(dataset._jdf) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve 'features' given input columns: [Channel, Grocery, Fresh, Frozen, Detergents_Paper, Region, Delicassen, Milk];"
你在哪一行出错? –
嗨Vivek,行是:model = kmeans.fit(vdf) – hpnhxxwn