2017-05-16 60 views
0

嗨,我是Pyspark Streaming的新手。Pyspark流式转换错误

numbers0 = sc.parallelize([1,2,3,4,5]) 

numbers1 = sc.parallelize([2,3,4,5,6]) 

numbers2 = sc.parallelize([3,4,5,6,7]) 

stream0 = ssc.queueStream([numbers0, numbers1, numbers2]) 

stream0.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

这工作得很好,但只要我做了以下我得到一个错误:

stream1 = stream0.transform(lambda x: x.mean()) 

stream1.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

我要的是数据流只由我以前流的平均值。 有谁知道我必须做什么?

+0

有什么错误? –

+0

通过这种方式使用变换,您将有3个条目是每个RDD的手段。 –

回答

0

调用变换时出现的错误是因为它需要一个RDD-RDD函数,如Spark's documentation for the transform operation中所述。当在RDD上调用平均值时,它不会返回新的RDD并因此返回错误。

现在,根据我的理解,您要计算由DStream组成的每个RDD的平均值。 DStream是使用queueStream创建的,并且由于命名参数oneAtATime保留为默认值,因此您的程序将在每个批处理间隔使用一个RDD。

计算平均每个RDD,你通常会做这样的forEachRDD输出操作中这样

# Create stream0 as you do in your example 

def calculate_mean(rdd): 
    mean_value = rdd.mean() 
    # do other stuff with mean_value like saving it to a database or just print it 

stream0.forEachRDD(calculate_mean) 

# Start and stop the Streaming Context