2017-06-01 37 views
0

我需要帮助从本地目录读取与pyspark运行kmeans流。有没有很好的答案在这个题目上的计算器如何从本地目录读取,kmeans流pyspark

这里是我的代码

if __name__ == "__main__": 
    ssc = StreamingContext(sc, 1) 

    training_data_raw, training_data_df = prepare_data(TRAINING_DATA_SET) 
    trainingData = parse2(training_data_raw) 

    testing_data_raw, testing_data_df = prepare_data(TEST_DATA_SET) 
    testingData = testing_data_raw.map(parse1) 

    #print(testingData) 
    trainingQueue = [trainingData] 
    testingQueue = [testingData] 

    trainingStream = ssc.queueStream(trainingQueue) 
    testingStream = ssc.queueStream(testingQueue) 

    # We create a model with random clusters and specify the number of clusters to find 
    model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) 

    # Now register the streams for training and testing and start the job, 
    # printing the predicted cluster assignments on new data points as they arrive. 
    model.trainOn(trainingStream) 

    result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features))) 
    result.pprint() 
    ssc.textFileStream('file:///Users/userrname/PycharmProjects/MLtest/training/data/') 
    ssc.start() 
    ssc.awaitTermination() 

谢谢!

回答

1
from pyspark.mllib.linalg import Vectors 
trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) 

为试验例

from pyspark.mllib.regression import LabeledPoint 
def parse(lp): 
    label = float(lp[lp.find('(') + 1: lp.find(',')]) 
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) 
    return LabeledPoint(label, vec) 
testData = ssc.textFileStream("/testing/data/dir").map(parse)