2014-05-21 48 views
8

我处于探索卡夫卡版本0.8.1.1的初始阶段。卡夫卡在一个组中的单一消费者故障

我已经成功地运行了消费者组示例,其中有多个分区及其在消费者中的消息发布情况。

我想运行的一个测试用例是组中的用户突然死亡(例如,kill -9)。当我这样做时,我预计再平衡发生,但没有发生。那么,我可以做这些事情吗?使用API​​

  • 配置卡夫卡

    1. 触发再平衡需要等待一定的时间,为消费者活动,并重新平衡自动假设它是正常关机。

    这里的问题是,分配给死亡使用者的分区中的所有消息都保留在队列中,直到重新平衡发生时才会处理。

  • +0

    我想提到的相同的方式,但issue..can你点我的线索仍面临着相同的http://stackoverflow.com/questions/39241872/kafka-common-consumerrebalancefailedexception-cant-rebalance-after -4 – Ratha

    回答

    7

    重新平衡将自动发生,可以在消费者配置(zookeeper.session.timeout.ms)中设置。按照该文件建立

    zookeeper.session.timeout.ms:ZooKeeper的会话超时。如果消费者在这段时间内没有对动物园管理员心跳,它被认为是死亡,并且会发生重新平衡。默认值为6000毫秒

    同一组中的其他现场消费者将在超时间隔后开始收到消息。

    根据您的要求配置此超时值。

    同样来自卡夫卡文档一些更多的信息:

    消费再平衡失败(你会看到ConsumerRebalanceFailedException):这是由于当两个用户试图拥有相同的主题划分冲突。日志会显示导致冲突的原因(搜索“冲突”)。

    1. 如果消费者订阅数量的主题和你的ZK服务器繁忙,这可能被消费者所造成没有足够的时间来看看在同一组中的所有消费者的一致视图。如果是这种情况,请尝试增加rebalance.max.retries和rebalance.backoff.ms。
    2. 另一个原因可能是其中一名消费者被杀死。再平衡期间的其他消费者不会意识到消费者在zookeeper.session.timeout.ms时间之后消失了。在这种情况下,请确保rebalance.max.retries * rebalance.backoff.ms> zookeeper.session.timeout.ms。
    +0

    谢谢,我已经这样做了,经过了很多次。 但是,可能杀死我的代码“模拟”是不够的。我已经通过消费者线程 consumerThread.interrupt(); 可能没有中断与zk的连接。 当我将消费者作为多个进程运行并杀死它们时,它完美运行,即发生了重新平衡。谢谢! – binit

    +0

    我试过了,但没有解决我的问题。 http://stackoverflow.com/questions/39241872/kafka-common-consumerrebalancefailedexception-cant-rebalance-after-4 – Ratha