2017-04-19 219 views
0

我正在使用spring-kafka,并且正在尝试创建卡夫卡消费者。我已参考http://howtoprogram.xyz/2016/09/23/spring-kafka-tutorial/https://www.codenotfound.com/2016/09/spring-kafka-consumer-producer-example.html。我目前使用的是完全相同的代码。无法创建卡夫卡消费者

这里是Receiver类

public class Receiver { 

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); 
public final CountDownLatch countDownLatch1 = new CountDownLatch(1); 

@KafkaListener(id = "foo", topics = "HelloKafkaTopic", group = "group1") 
public void receive(ConsumerRecord<?, ?> record) { 
    System.out.println(record); 
    countDownLatch1.countDown(); 
} 
} 

这里是KafkaConsumerConfig

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
@Bean 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setConcurrency(3); 
    factory.getContainerProperties().setPollTimeout(3000); 
    return factory; 
} 

@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> propsMap = new HashMap<>(); 
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); 
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    return propsMap; 
} 

@Bean 
public Receiver listener() { 
    return new Receiver(); 
} 

当我注释我的类,它有@KafkaListener法,用@EnableKafka。我收到以下错误。如果我在某个地方犯了错,我很难理解。

java.lang.NoSuchMethodError: org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver.<init>(Lorg/springframework/messaging/converter/MessageConverter;)V 
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaHandlerMethodFactoryAdapter.createDefaultMessageHandlerMethodFactory(KafkaListenerAnnotationBeanPostProcessor.java:654) 
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaHandlerMethodFactoryAdapter.getMessageHandlerMethodFactory(KafkaListenerAnnotationBeanPostProcessor.java:630) 
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaHandlerMethodFactoryAdapter.createInvocableHandlerMethod(KafkaListenerAnnotationBeanPostProcessor.java:625) 
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.configureListenerAdapter(MethodKafkaListenerEndpoint.java:112) 
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:101) 
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:297) 
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:282) 
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:211) 
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:46) 
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:182) 
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:154) 
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:128) 
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) 
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) 
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:224) 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:792) 
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:839) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:538) 
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118) 
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) 
at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) 

回答

0

我认为问题出在你的classpath中,你的老版本是spring-messaging。尝试运行使用依赖管理&修复spring-messaging的最新(v4.3.x)版本pom.xml

首先检查您目前正在使用哪个版本的spring-messaging

+0

我使用gradle构建我的项目。我正在使用org.springframework:spring-messaging:4.2.6-RELEASE –

+0

将它更新到4.3.x(无论最新版本是否可用) –

+0

是的,我想到了,感谢您的帮助 –