2015-06-17 28 views
0

现在我正在写一个Spark流程程序来检测数据中心网络的异常。我尝试使用回归算法。例如,我使用训练数据集计算模型(即系数),然后如何在数据流中使用此先前计算的模型。我使用下面的连接,但得到异常。如何使用Spark Stream中的先前计算结果加入Stream RDD?

Traceback (most recent call last): 
    File "/home/xiuli/PycharmProjects/benchmark/parx.py", line 98, in <module> 
    joinedStream = testRDD.join(trainingRDD) 
    File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 362, in join 
    File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 313, in transformWith 
AttributeError: 'PipelinedRDD' object has no attribute '_jdstream' 

我可以看到Spark流指南给出an example,但它缺乏细节。

流数据集加入

这个前面已经解释的同时DStream.transform 操作所示。这是将数据集加入窗口流 的另一个示例。

dataset = ... # some RDD 
windowedStream = stream.window(20) 
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset)) 

以下是我的代码:

from __future__ import print_function 
import sys,os,datetime 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql.context import SQLContext 
from pyspark.resultiterable import ResultIterable 
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD 
import numpy as np 
import statsmodels.api as sm 


def splitLine(line, delimiter='|'): 
    values = line.split(delimiter) 
    st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') 
    return (values[0],st.hour), values[2:] 

def reg_m(y, x): 
    ones = np.ones(len(x[0])) 
    X = sm.add_constant(np.column_stack((x[0], ones))) 
    for ele in x[1:]: 
     X = sm.add_constant(np.column_stack((ele, X))) 
    results = sm.OLS(y, X).fit() 
    return results 

def train(line): 
    y,x = [],[] 
    y, x = [],[[],[],[],[],[],[]] 
    reading_tmp,temp_tmp = [],[] 
    i = 0 
    for reading, temperature in line[1]: 
     if i%4==0 and len(reading_tmp)==4: 
      y.append(reading_tmp.pop()) 
      x[0].append(reading_tmp.pop()) 
      x[1].append(reading_tmp.pop()) 
      x[2].append(reading_tmp.pop()) 
      temp = float(temp_tmp[0]) 
      del temp_tmp[:] 
      x[3].append(temp-20.0 if temp>20.0 else 0.0) 
      x[4].append(16.0-temp if temp<16.0 else 0.0) 
      x[5].append(5.0-temp if temp<5.0 else 0.0) 
     reading_tmp.append(float(reading)) 
     temp_tmp.append(float(temperature)) 
     i = i + 1 
    return str(line[0]),reg_m(y, x).params.tolist() 




def detect(line): 
    y,x = [],[] 
    y, x = [],[[],[],[],[],[],[]] 
    reading_tmp,temp_tmp = [],[] 
    i = 0 
    for reading, temperature in line[1]: 
     if i%4==0 and len(reading_tmp)==4: 
      y.append(reading_tmp.pop()) 
      x[0].append(reading_tmp.pop()) 
      x[1].append(reading_tmp.pop()) 
      x[2].append(reading_tmp.pop()) 
      temp = float(temp_tmp[0]) 
      del temp_tmp[:] 
      x[3].append(temp-20.0 if temp>20.0 else 0.0) 
      x[4].append(16.0-temp if temp<16.0 else 0.0) 
      x[5].append(5.0-temp if temp<5.0 else 0.0) 
     reading_tmp.append(float(reading)) 
     temp_tmp.append(float(temperature)) 
     i = i + 1 
    return line[0],reg_m(y, x).params.tolist() 




if __name__ == "__main__": 
    if len(sys.argv) != 4: 
     print("Usage: parx.py <checkpointDir> <trainingDataDir> <streamDataDir>", file=sys.stderr) 
     exit(-1) 

    checkpoint, trainingInput, streamInput = sys.argv[1:] 
    sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming") 
    trainingLines = sc.textFile(trainingInput) 
    trainingRDD = trainingLines.map(lambda line: splitLine(line, "|"))\ 
           .groupByKey()\ 
           .map(lambda line: train(line)).cache() 




    ssc = StreamingContext(sc, 1) 
    ssc.checkpoint(checkpoint) 
    lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, "|")) 

    testRDD = lines.groupByKeyAndWindow(1,1).map(lambda line:(str(line[0]), line[1])) 
    joinedStream = testRDD.join(trainingRDD) 
    joinedStream.pprint(20) 

    ssc.start() 
    ssc.awaitTermination() 
+0

我知道这是旧的,但具有u找到了解决办法 – Bg1850

回答

1

根据您提到的文档,请尝试:

testRDD.transform(lambda rdd: rdd.join(trainingRDD)) 
相关问题