我可以在命令行上发送和接收消息以对抗Kafka位置安装。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我也有一个卡夫卡用户的Java代码。该代码昨天收到消息。但是,它今天早上没有收到任何消息。代码没有改变。我想知道属性配置是否不完全正确。下面是我的配置:卡夫卡消费者 - 接收消息不一致
生产者:
bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()
和ProducerRecord设置为
ProducerRecord<String, String>("test", "mykey", "myvalue")
消费者:
zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer
和Java代码:
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
缺什么?