2016-07-08 209 views
2

我将我的应用程序与spring-kafka(不是spring-integration-kafka)集成在一起。这里是项目的春季文档:http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsinglespring-kafka(未集成)消费者不消费消息

我的制作人完美地工作,但消费者没有消费任何消息。任何指针。

这里是我的配置:

@EnableKafka 
public class MyConfig { 

    @Value("${kafka.broker.list}") // List of servers server:port, 
    private String kafkaBrokerList; 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(12); 
     factory.getContainerProperties().setPollTimeout(3000); 
     factory.getContainerProperties().setIdleEventInterval(60000L); 
     factory.setAutoStartup(Boolean.TRUE); 
     factory.setMessageConverter(new StringJsonMessageConverter()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, Message> consumerFactory() { 
     JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000); 
     props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000); 
     return props; 
    } 

    @KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory") 
    public void listen(@Payload Message message) { 
     System.out.println(message); 
    } 

} 

**编辑与响应的详细信息**

感谢加里。我在日志中看不到任何例外。此外,我尝试KafkaTemplate类似的配置,我能够发布消息队列,但消费者,没有运气。我正在更改代码来使用String而不是我的Message对象。这里是日志的部分:

2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values: 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    group.id = 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    max.partition.fetch.bytes = 1048576 
    bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093] 
    retry.backoff.ms = 10000 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    enable.auto.commit = true 
    ssl.key.password = null 
    fetch.max.wait.ms = 500 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 60000 
    ssl.truststore.password = null 
    session.timeout.ms = 15000 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer 
    ssl.protocol = TLS 
    check.crcs = true 
    request.timeout.ms = 40000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.keystore.location = null 
    heartbeat.interval.ms = 3000 
    auto.commit.interval.ms = 10000 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    fetch.min.bytes = 1 
    send.buffer.bytes = 131072 
    auto.offset.reset = latest 

我也确实看到以下日志:

2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
+1

你的代码看起来不错。所以,不确定你正在寻找哪种样品。即使我们添加一个,它也会非常简单,并且没有太多的配置选项。所以,或者你连接到错误的Kafka和/或主题,或者有某些日志中的某些内容表明某个错误。 –

+0

谢谢Gary回应。我在日志中看不到任何例外。此外,我尝试了类似配置的KafkaTemplate,并且我可以将消息发布到队列中,但对于消费者而言,没有运气。我正在更改代码来使用String而不是我的Message对象。由于评论长度限制,我已在主帖中添加了此内容。 – Shailesh

+0

添加以下类别后,您可以在日志中追踪更多信息:'org.apache.kafka.clients = DEBUG' –

回答

2

上述文件提到说:

虽然串行器/解串器API是非常简单和灵活从低级Kafka Consumer和Producer的角度来看,在消息级别上,KafkaTemplate和@KafkaListener都存在是不够的。为了方便地转换org.springframework.messaging.Message,Spring for Apache Kafka提供了MessageConverter抽象与MessagingMessageConverter实现和StringJsonMessageConverter自定义。

但在你的情况结合起来MessageConverter

 factory.setMessageConverter(new StringJsonMessageConverter()); 

定制Deserializer

 JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 

最简单的为您解决的情况下应该使用StringDeserializer代替:

https://kafka.apache.org/090/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html

谈到上面给出的日志消息Marking the coordinator XXX dead.,该错误与spring-kafka项目无关,但意味着问题出在您的Kafka配置上。在我的情况下,当卡夫卡节点无法访问zookeper时,我们遇到了这样的问题。为了解决该问题,我建议你在本地安装两个卡夫卡& Zookeper并确保生产消耗上它的工作原理,使用kafka-console-producerkafka-console-consumer,如:

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html

那么,作为下一阶段,你可能使用相同的本地安装检查您的示例spring-kafka应用程序。

+0

同时,'JsonDeserializer'允许使用POJO作为有效载荷,从Kafka的消息中提取它的实例。 – stepio

+0

谢谢Stepio,我尝试删除StringJsonMessageConverter,但仍然没有看到消息被消耗。为了降低复杂性,我正在更改代码以使用StringDesilizer而不是定制的JSON deserilizer进行测试。 'code' return new DefaultKafkaConsumerFactory <>(consumerConfigs(),new IntegerDeserializer(),new StringDeserializer()); 'code' and consumer is changed: 'code' containerProps.setMessageListener((MessageListener )message - > {System.out.println(message.value()); });'' code' – Shailesh