3

我试图使用随机森林模型来预测示例流,但看起来我无法使用该模型对示例进行分类。 这里是pyspark使用的代码:结合Spark Streaming + MLlib

sc = SparkContext(appName="App") 

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', numTrees=150) 


ssc = StreamingContext(sc, 1) 
lines = ssc.socketTextStream(hostname, int(port)) 

parsedLines = lines.map(parse) 
parsedLines.pprint() 

predictions = parsedLines.map(lambda event: model.predict(event.features)) 

并且在集群中的编译它返回的错误:

Error : "It appears that you are attempting to reference SparkContext from a broadcast " 
    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. 

是有使用从静态数据产生的MODELE以预测的方式流媒体示例?

谢谢你们,我真的很感激它!

+0

我写了一个类似的问题在这里https://stackoverflow.com/questions/48846882/pyspark-ml-streaming –

回答

3

是的,您可以使用从静态数据生成的模型。您遇到的问题根本与流式传输无关。您无法在操作或转换中使用基于JVM的模型(有关解释原因,请参阅How to use Java/Scala function from an action or a transformation?)。相反,你应该申请predict方法完全RDD例如使用transformDStream

from pyspark.mllib.tree import RandomForest 
from pyspark.mllib.util import MLUtils 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from operator import attrgetter 


sc = SparkContext("local[2]", "foo") 
ssc = StreamingContext(sc, 1) 

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') 
trainingData, testData = data.randomSplit([0.7, 0.3]) 

model = RandomForest.trainClassifier(
    trainingData, numClasses=2, nmTrees=3 
) 

(ssc 
    .queueStream([testData]) 
    # Extract features 
    .map(attrgetter("features")) 
    # Predict 
    .transform(lambda _, rdd: model.predict(rdd)) 
    .pprint()) 

ssc.start() 
ssc.awaitTerminationOrTimeout(10) 
+0

你会如何将来自套接字的字符串转换为标记点? –

+0

我在这里写了一个类似的问题:https://stackoverflow.com/questions/48846882/pyspark-ml-streaming –