0
我有一个配置流中提到,像这样一个pyspark程序,可以接受多个卡夫卡:Pyspark多卡夫卡流覆盖变量
[stream1]
server=10.0.0.1:9090
topic=log_topic
[stream2]
server=10.0.0.2:9090
topic=file_topic
所以我的代码使用所提到的配置加载多个数据流是这样的:
from configobj import ConfigObj
config = ConfigObj("my.conf")
for i, j in conf.iteritems():
stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
stream.pprint()
现在说,如果流1具有以下传入内容:
aaaaa
aaaaa
aaaaa
...
和流有以下内容:
bbbbb
bbbbb
bbbbb
...
使用pprint功能,我期待看到下面的输出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
aaaaa
aaaaa
aaaaa
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
但我看到下面的输出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
我理解似乎有延迟加载或者在for循环的第二次迭代之后读取变量stream
之后。任何人都可以让我知道如何实现这一点,以便我可以在for循环中处理2个独立的流。
谢谢!
张感谢您的回复。但我不需要流的联合,而是我想基于某些逻辑将两个流的不同映射缩减函数应用于这两个流。我需要分开的流。 –