2016-06-23 153 views
0

我可以在命令行上发送和接收消息以对抗Kafka位置安装。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我也有一个卡夫卡用户的Java代码。该代码昨天收到消息。但是,它今天早上没有收到任何消息。代码没有改变。我想知道属性配置是否不完全正确。下面是我的配置:卡夫卡消费者 - 接收消息不一致

生产者:

bootstrap.servers - localhost:9092 
group.id - test 
key.serializer - StringSerializer.class.getName() 
value.serializer - StringSerializer.class.getName() 

和ProducerRecord设置为

ProducerRecord<String, String>("test", "mykey", "myvalue") 

消费者:

zookeeper.connect - "localhost:2181" 
group.id - "test" 
zookeeper.session.timeout.ms - 500 
zookeeper.sync.time.ms - 250 
auto.commit.interval.ms - 1000 
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer 

和Java代码:

Map<String, Integer> topicCount = new HashMap<>(); 
    topicCount.put("test", 1); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer 
      .createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 

缺什么?

回答

0

许多事情可能正在发生。

首先,您的客户的ZooKeeper会话超时时间非常短,这意味着由于垃圾收集暂停,消费者可能会遇到很多“软故障”。发生这种情况时,消费者群体将重新平衡,这可能会暂停消费。如果这种情况非常频繁发生,消费者可能会进入一种不会消费信息的状态,因为它不断地被重新平衡。我建议将ZooKeeper会话超时时间增加到30秒,以查看这是否可以解决问题。如果是这样,您可以尝试将其设置得更低。

其次,您是否可以确认新消息正在生成“测试”主题?您的消费者只会消费尚未提交的新消息。这个主题可能没有任何新消息。

第三,您是否有同一消费群体中的其他消费者可以处理消息?如果一个消费者经常发生软故障,其他消费者将被分配其分区。

最后,您正在使用最终将被删除的“旧”消费者。如果可能的话,我建议转向卡夫卡0.9中提供的“新”消费者(KafkaConsumer.java)。虽然我不能保证这会解决你的问题。

希望这会有所帮助。