2017-07-24 98 views
0

我想使用Naive分类器模型来预测Spark dataframe的输出类,我使用Spark 2.1.0的结构化流式传输功能。 当我尝试这样做:将Spark数据帧转换为Vector

tokenizer = Tokenizer(inputCol="message",outputCol="logTokenize") 
tokenizeData = tokenizer.transform(stream_df) 

hashingTF = HashingTF(inputCol="logTokenize", outputCol="rawFeatures", numFeatures = 1000) 
featurizedData = hashingTF.transform(tokenizeData) 
stream_df = featurizedData.select("rawFeatures") 

path = "/tmp/NaiveClassifier" 
naive_classifier_model = NaiveBayesModel.load(spark.sparkContext,path) 

predictions = naive_classifier_model.predict(stream_df) 

,我得到了以下错误消息:

TypeError: Cannot convert type <class 'pyspark.sql.dataframe.DataFrame'> into Vector 

stream_df是一个Spark数据框,我想用一个rawFeatures数据框和预测班列。

回答

0

使用pyspark.ml.feature.VectorAssembler转变为载体,

from pyspark.ml.feature import VectorAssembler 
vecAssembler = VectorAssembler(inputCols=['rawFeatures'], outputCol="features") 
stream_df = vecAssembler.transform(featurizedData) 

此外,您使用的Tokenzier,Hasing TF变压器为好。所以,我相信你可以使用ML管道将所有transfomers整合在一起。只是一个建议,看看。

+0

我想在那之前,我认为这个问题是朴素分类器对象,当我尝试调用'naive_classifier_model.fit(stream_df)'我得到了一个错误,我认为我无法用'VectorAssembler'的输出调用'predict'函数。是的,最好使用管道,但目前我这样做调试 –

+0

你有vectorassembler尝试同样的错误吗? – Suresh

0

尝试使用浮动:

path = "/tmp/NaiveClassifier" 
naive_classifier_model = NaiveBayesModel.load(spark.sparkContext,path) 

prediction= stream_df.rdd.map(lambda p:(float(naive_classifier_model.predict(p.rawFeatures)))) 
+0

当我尝试:'DataFrame'对象没有属性'map'时,我收到以下错误消息,我使用结构化流。在编程指南中,它标明我们可以应用数据帧的“地图”功能 –

+0

您使用的是什么Spark版本?试试这个 - 'prediction = stream_df.rdd.map(lambda p:(float(naive_classifier_model.predict(p.rawFeatures))))'Spark 2从数据框移除直接映射 –