2017-09-12 143 views
0

我们有一个生产者&一个消费者&一个分区。消费者/生产者都是弹簧启动应用程序。消费者应用在我的本地机器上运行,而制作者与远程机器上的kafka &动物园管理员一起运行。kafka生产者/消费者重新启动后,消费者没有收到消息

在开发过程中,我重新部署了生产者应用程序并进行了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有运气。什么是问题和/或如何解决?

消费者配置:

spring: 
    cloud: 
    stream: 
     defaultBinder: kafka 
     bindings: 
     input: 
      destination: sales 
      content-type: application/json 
     kafka: 
     binder: 
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      defaultZkPort: 2181 
      defaultBrokerPort: 9092 
server: 
    port: 0 

制片配置

cloud: 
stream: 
    defaultBinder: kafka 
    bindings: 
    output: 
     destination: sales 
     content-type: application/json 
    kafka: 
    binder: 
     brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     defaultZkPort: 2181 
     defaultBrokerPort: 9092 

EDIT2

5分钟后,消费者应用具有以下异常死亡:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder 
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder: 
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel 
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel 
+0

听起来很简单。你介意在GitHub的某个地方分享这个应用程序,这样我们就可以在本地重现问题了吗? –

+0

@ArtemBilan我很抱歉,但我无法分享我的代码。你需要什么细节来提出解决方案? – LazyTechie

+0

我没有代码没有想法。也许你可以分享给消费者和制作人的配置?是的,我知道你不能共享整个应用程序,但至少可以为我们想出一些简单的Spring Boot应用程序... –

回答

0

那么,它看起来好像已经有一个bugspring-cloud-stream-binder-kafka报告说明resetOffset属性没有影响。因此,消费者总是要求消息的偏移量为latest

正如在git问题中提到的,唯一的解决方法是通过kafka使用者CLI工具修复此问题。

1

看看上面关于DEBUG的建议是否显示了更多的信息。它看起来像是从KafkaTopicProvisioner获得了一些Timeout异常。但是,当您重新启动消费者时,会出现这种情况。它看起来像消费者以某种方式与经纪人沟通有一些麻烦,你需要找出那里正在发生的事情。