2016-02-16 53 views
1

我试图在卡夫卡创建新主题时启动动态消费者,但动态启动的消费者始终缺少启动/第一条消息,但从此消费该消息。我正在使用kafka-python模块,并使用更新的KafkaConsumer和KafkaProducer。卡夫卡消费者没有获取所有邮件

代码生产者是

producer = KafkaProducer(bootstrap_servers='localhost:9092') 
record_metadata = producer.send(topic, data) 

和消费者的代码是

consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest') 

请提出来的东西来过这个问题或任何配置我在我的生产者和消费者实例包括。

+0

你是什么意思的动态消费者? – puneet

+0

每当创建新主题时,我都会启动消费者。我们称这些消费者在运行时就像动态发布一样动态。 –

+0

在您的消费者中,您正在提供主题。你如何知道主题名称?数据发布后是否创建了消费者流?检查我的答案...我认为这是可能的解决方案 – puneet

回答

2

你可以设置auto_offset_reset最早。

创建新消费者流时,它将从最新的偏移量(auto_offset_reset的默认值)开始,并且您将错过消费者未启动时发送的消息。

你可以在kafka python doc的地方阅读。相关部分低于

auto_offset_reset(STR) - 一种在 OffsetOutOfRange错误复位偏移政策:“最早的”将移动到最早的可用 消息,“最新的”将移动到最近。任何其他值将会引起异常 。默认:'最新'。