2017-04-12 137 views
3

我有一个卡夫卡Streams应用程序从消费国和生产的卡夫卡集群3名经纪人和3.除了消费者偏移主题(50个分区)复制的因素,所有其他议题都只有一个分区。卡夫卡领导人选举造成卡夫卡流崩溃

当经纪人尝试一个优选的复制品的选举中,流应用(其在一个完全不同的实例比经纪人运行)失败,错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing 
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119) 
    ... 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 

是正常的是,流应用尝试成为分区的领导者,因为它运行在不属于Kafka集群的服务器上?

我可以重现由需求这种行为:

  1. 查杀经纪公司之一(于是其他2接管该有杀经纪人为他们的领袖的所有分区的领导者,如预期)
  2. 瞻打死经纪人备份
  3. 触发首选副本领导人选举与bin/kafka-preferred-replica-election.sh --zookeeper localhost

我的问题似乎是与此类似reported failure,所以我想知道这是否是一个新的Kafka Streams错误。我的完整堆栈跟踪与报告失败链接的要点完全相同(here)。

另一个潜在的有趣的细节是,领导人选举期间,我得到了经纪人的controller.log这些消息:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread) 
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed 
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) 
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) 
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) 
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) 
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 

我最初以为这方面的错误是罪魁祸首,但领导人选举崩溃后Streams应用程序,如果我重新启动Streams应用程序,它会正常工作,直到下一次选举,没有我触及经纪人。

所有服务器(3名卡夫卡经纪人和所述流应用)是在EC2实例运行。

回答

5

现在固定在0.10.2.1。如果你不能挑选时,要确保你有这两个参数设置为你流的配置如下:

final Properties props = new Properties(); 
... 
props.put(ProducerConfig.RETRIES_CONFIG, 10); 
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); 
+1

这两个参数(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,20000),用'props.put一起;'解决问题!谢谢! –