2016-06-21 105 views
0

注意的多个分区数据:使用kafka_2.11-0.9.0.1为什么卡夫卡消费者API不获取由一个主题

我已经创建了一个名为卡夫卡话题:consumer-tutorials3 Partitions如下:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --describe --topic consumer-tutorial 
--zookeeper localhost:2181 
Topic:consumer-tutorial PartitionCount:3  ReplicationFactor:1  Configs: 
     Topic: consumer-tutorial  Partition: 0 Leader: 0  Replicas: 0  Isr: 0 
     Topic: consumer-tutorial  Partition: 1 Leader: 0  Replicas: 0  Isr: 0 
     Topic: consumer-tutorial  Partition: 2 Leader: 0  Replicas: 0  Isr: 0 

一旦创建过主题,我使用Producer API按照以下生产的每个分区中的一些数据:

KafkaProducer<String,String> prod = new KafkaProducer<String,String>(props); 

for(int i =0; i < 3; i++) 
{ 
    for(int x=start; x<end; x++) 
    { 
     prod.send(new ProducerRecord<String,String>("consumer-tutorial",i,Integer.toString(x),Integer.toString(rnd.nextInt(100))));  } 
    start=end; 
    end = start + 10; 
} 
prod.close(); 

现在,当我试图获取记录/ CON使用以下Consumer API这个话题庙消息:

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); 
consumer.subscribe(Arrays.asList("consumer-tutorial")); 

while(true) 
{ 
    ConsumerRecords<String, String> records = consumer.poll(100); 

    for(ConsumerRecord<String,String> record: records) 
    { 
     System.out.printf("Key: %s, Value = %s", record.key(), record.value()); 
    } 

} 

同时运行此代码,我没有得到任何记录。我检查record变量,但没有KEY:VALUE对从Poll

Watch on "records" shows nothing

未来谁能帮助我,为什么我没有得到任何要显示的数据。

NOTE: It works well when I have single partition topic. 

回答

0

当你订阅主题,内部大量的操作会发生这样的1.查找组协调员2.发送到组请求,并获取组长3领导人将确保任何一个加入该组或离开团队成员4.领导者根据分配策略(范围/ RoundRobin)将分区划分给成员。 5.然后每个消费者成员从组协调员获取元数据。

那么它将取的记录。所以最初的几次迭代在轮询时不会得到任何数据。

0

据我所知,它会采取前几个迭代在我的情况然而循环取记录进入无限的'Poll'保持轮询不休。我今天晚些时候发现,当我用一些新名称重新设置组名(我使用UUID来随机化组名)时,它会使用不同的分区从主题获取数据。因此,得出结论:偏移量需要重置到开始位置,以便消费者可以从头开始获取数据,并在每次new保持偏移指针开始时保持组ID。现在我想知道什么是设置偏移的话题(不分区)开始的方式..我尝试使用'props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");“和seekseekToBeginning但他们都不会工作,除非消费者已经失去了偏移轨道..那么有没有办法通过我可以设置偏移指针开始?