我有以下程序来消耗所有来到Kafka的消息。如何关闭卡夫卡消费者一旦消费完所有消息?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
使用上面的程序,我能够消耗所有来到卡夫卡的消息。但是,一旦所有消息都消耗完了,我想关闭不在发生的卡夫卡消费。我需要帮助。
我试过使用consumer.close(),但它似乎没有出来的for循环本身执行close()。可能是我想念什么不知道什么。 – pankmish
我认为你的循环将会无限期地等待更多的消息。你必须给它一个退出条件才能结束。 – dawsaw