2017-07-17 108 views
0

我有以下程序来消耗所有来到Kafka的消息。如何关闭卡夫卡消费者一旦消费完所有消息?

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092']) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

使用上面的程序,我能够消耗所有来到卡夫卡的消息。但是,一旦所有消息都消耗完了,我想关闭不在发生的卡夫卡消费。我需要帮助。

回答

0

如果我向KafkaConsumer对象提供consumer_timeout_ms参数,我现在可以关闭卡夫卡消费者。它接受以毫秒为单位的超时值。 以下是代码片段。

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092'], 
         consumer_timeout_ms=1000) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

在上面的代码中,如果消费者在1秒内没有看到任何消息,它将关闭会话。

0

它看起来像你想要consumer.close()而不是KafkaConsumer.close()。它没有被记录为静态方法。

+0

我试过使用consumer.close(),但它似乎没有出来的for循环本身执行close()。可能是我想念什么不知道什么。 – pankmish

+0

我认为你的循环将会无限期地等待更多的消息。你必须给它一个退出条件才能结束。 – dawsaw