2017-09-13 83 views
0

我有一个配置流中提到,像这样一个pyspark程序,可以接受多个卡夫卡:Pyspark多卡夫卡流覆盖变量

[stream1] 
server=10.0.0.1:9090 
topic=log_topic 

[stream2] 
server=10.0.0.2:9090 
topic=file_topic 

所以我的代码使用所提到的配置加载多个数据流是这样的:

from configobj import ConfigObj 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    stream.pprint() 

现在说,如果流1具有以下传入内容:

aaaaa 
aaaaa 
aaaaa 
... 

和流有以下内容:

bbbbb 
bbbbb 
bbbbb 
... 

使用pprint功能,我期待看到下面的输出:

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
aaaaa 
aaaaa 
aaaaa 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

但我看到下面的输出:

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

我理解似乎有延迟加载或者在for循环的第二次迭代之后读取变量stream之后。任何人都可以让我知道如何实现这一点,以便我可以在for循环中处理2个独立的流。

谢谢!

回答

0

喜欢的东西:

streams = [] 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    streams.append(stream) 

all_topic = ssc.union(*streams)  
all_topic.pprint() 
+0

张感谢您的回复。但我不需要流的联合,而是我想基于某些逻辑将两个流的不同映射缩减函数应用于这两个流。我需要分开的流。 –