我需要知道如何使用“为”我的卡夫卡KStreams线环......下面是我的“for”循环需要被列入KStreams“为”卡夫卡KStreams循环支持
for (int i = 0; i < 6 ; i++) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
和我KStreams loooks像
KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[",""))
如何添加我上面的“for”循环到我KStreams
这个for循环的确切目的是什么? KStream对象只是一种构建将在其他线程中运行的拓扑(在.start()调用之后)的方法。在你的代码中,你只需要在你的拓扑结构中增加6倍于同一个处理器,睡眠部分对流的执行没有任何影响,但是只会延缓拓扑构建。 – nbchn
@nbchn ok ...事情是我在'for'循环中使用了value.split来分割我的数据....所以每当我的数据被分割时它应该会睡10ms左右...这是因为我需要我的数据来一个接一个,如果你需要更多的细节让我知道 –