2016-11-20 49 views
0

我需要找到每个RDD的最大密钥,但使用reduce()时,我能得到的是整个Dstream中最大的一个。例如,在这个流中,我想要返回的是(2,“b”),(2,“d”),(3,“f”),但是我只能得到(3,“f” )通过reduce(max) 我怎样才能得到(2,“b”),(2,“d”),(3,“f”)?我可以获得DStream中每个RDD的最大密钥吗?

sc = SparkContext(appName="PythonStreamingQueueStream") 
ssc = StreamingContext(sc, 1) 
stream = ssc.queueStream([sc.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"), 
(1,"e"),(3,"f")],3)]) 

stream.reduce(max).pprint() 
ssc.start() 
ssc.stop(stopSparkContext=True, stopGraceFully=True) 
+0

只有一个'RDD'流中.... – 2016-11-20 13:20:51

+0

我很抱歉,但我分割数据成3部分由'sc.parallelize' –

+0

否...请阅读'minPartitions'正在做什么:)通过3个RDD,每个批次都得到'max'。 – 2016-11-20 13:28:00

回答

0

此:

stream = ssc.queueStream([sc.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"), 
    (1,"e"),(3,"f")],3)]) 

只有一个批次,其中第一和唯一的一批具有(最小)3个分区创建一个流。我想你想:

stream = ssc.queueStream([ 
    sc.parallelize([(1,"a"), (2,"b")]), 
    sc.parallelize([(1,"c"), (2,"d")]), 
    sc.parallelize([(1,"e"), (3,"f")]), 
]) 

,这将给你预期的结果有:

stream.reduce(max).pprint() 
+0

非常感谢,我还有其他问题。在我的代码中,我将数据分成3个分区,如果我使用'1'而不是'3',结果将是相同的。那么转型的“并行化”意味着什么?节省我们的处理时间或其他东西? –

+0

用'max'减少''的结果将不取决于分区的数量。 'parallelize'创建RDD。 'minPartitions'设置此RDD的分区数量。 – 2016-11-21 08:58:39

+0

抱歉,我仍然对分区的含义感到困惑。更多的分区可以节省更多的操作时间或者只是将大数据分成小数据以帮助分配更多的计算机? –

相关问题