2015-04-07 42 views
14

我使用0.8.2.1 kafka-clients库连接到Kafka。当卡夫卡上线时,我能够成功连接,但当卡夫卡倒闭时,我想优雅地处理失败。下面是我的配置:我如何优雅地处理卡夫卡中断?

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); 
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); 
producer = new KafkaProducer(kafkaProperties); 

当卡夫卡下来,我得到了我的日志以下错误:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75] 
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75] 
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 

在无限循环中重复这个错误,并锁定了我的Java应用程序。我尝试了与超时,重试和确认有关的各种配置设置,但是我一直无法防止发生这种循环。

有没有可以防止这种情况的配置设置?我需要尝试不同版本的客户端吗?如何优雅地处理Kafka中断?

回答

20

我想通了,这种设置组合使卡夫卡客户端迅速失败,但不会持有线程或垃圾邮件日志:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000"); 
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000"); 

我不喜欢的是,卡夫卡客户持有,同时试图连接到该线程卡夫卡服务器,而不是完全异步,但这至少是功能。

+0

非常有帮助,谢谢。这是正式记录在任何地方? – maxenglander

+0

@maxenglander:不是我看到的。我不得不通过代码去探索,并且有很多尝试和错误。 –

+0

“而不是完全异步” – vbence

0

在0.9客户端中,还有max.block.ms属性,这将限制允许客户端运行的时间。