2015-05-14 160 views
2

我正在使用卡夫卡风暴整合。 Kafka会将数据加载到队列中,Kafka Spout将提取数据和进程。我有以下设计。风暴KafkaSpout失败,当螺栓很慢

Kafka -> Queue -> KafkaSpout -> Process1 Bolt -> Process2 Bolt 

问题是,如果进程2博尔特正在较长的时间来处理KafkaSpout是越来越失败的数据,并再次尝试读取数据从队列这会导致重复的记录。

如果螺栓加工缓慢,为什么KafkaSpout将其视为失败?解决办法是什么?是否有任何超时或任何类似的属性,我必须在风暴中设置?

回答

3

如果处理时间过长,默认30秒,Storm会失败一个元组。自Storm guarantees processing以来,一旦失败,Kafka喷口将重放相同的消息,直到成功处理元组。


From doc

元组被认为是失败时它的消息的树未能在规定的超时时间内得到充分的处理。可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置在特定于拓扑的基础上配置此超时,默认值为30秒