2017-12-18 131 views
0

我需要知道如何使用“为”我的卡夫卡KStreams线环......下面是我的“for”循环需要被列入KStreams“为”卡夫卡KStreams循环支持

for (int i = 0; i < 6 ; i++) { 
      try { 
       textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); 
       Thread.sleep(2000); 
      }catch (InterruptedException e){ 
       e.printStackTrace(); 
      } 
     } 

和我KStreams loooks像

KStream<String, String> textlines = builder.stream("intopic"); 
KStream<String, String> mstream = textlines 
       .mapValues(value -> value.replace("[","")) 

如何添加我上面的“for”循环到我KStreams

+1

这个for循环的确切目的是什么? KStream对象只是一种构建将在其他线程中运行的拓扑(在.start()调用之后)的方法。在你的代码中,你只需要在你的拓扑结构中增加6倍于同一个处理器,睡眠部分对流的执行没有任何影响,但是只会延缓拓扑构建。 – nbchn

+0

@nbchn ok ...事情是我在'for'循环中使用了value.split来分割我的数据....所以每当我的数据被分割时它应该会睡10ms左右...这是因为我需要我的数据来一个接一个,如果你需要更多的细节让我知道 –

回答

1

的东西是我用过value.split在“for”循环分裂我的数据.. ..所以,每当我的数据分裂它应该睡10ms左右...这是因为我需要我的数据来一个接一个

从你说你想要订购。要达到您的要求,您不需要sleep。它会工作。 Kafka Streams WordCount示例(我假设您的代码基于此示例)的作用相同:它也使用flatMapValues,传入平面贴图的lambda将文本行分割为单词。

除非我和其他人误解了你的问题(在这种情况下,你可能应该进一步澄清你的问题),我认为你不必要地复杂了你的代码。

+0

好吧,因为你说.... flatMapValues使数据来一个接一个? –

+0

[链接](https://stackoverflow.com/questions/47817572/adding-for-loop-with-delay-in-kafka-streams)你可以看看这个例子...这正是我所需要的做... –

+0

这是相同的答案。只需使用'flatMapValues',它可以确保数据一个接一个。 –