2016-12-31 78 views
0

我使用Apache卡夫卡版本kafka_2.10-0.10.1.0阿帕奇卡夫卡例外:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/UTIL /列表;)V

Poc的i在创造了简单的生产者和消费者。 当我尝试使用消息得到以下错误:

Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V 
at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59) 

代码:

package com.spnotes.kafka.simple; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.errors.WakeupException; 

import java.util.Arrays; 
import java.util.Properties; 
import java.util.Scanner; 


public class Consumer { 
    private static Scanner in; 

    public static void main(String[] argv)throws Exception{ 
     if (argv.length != 2) { 
      System.err.printf("Usage: %s <topicName> <groupId>\n", 
        Consumer.class.getSimpleName()); 
      System.exit(-1); 
     } 
     in = new Scanner(System.in); 
     String topicName = argv[0]; 
     String groupId = argv[1]; 

     ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId); 
     consumerRunnable.start(); 
     String line = ""; 
     while (!line.equals("exit")) { 
      line = in.next(); 
     } 
     consumerRunnable.getKafkaConsumer().wakeup(); 
     System.out.println("Stopping consumer ....."); 
     consumerRunnable.join(); 
    } 

    private static class ConsumerThread extends Thread{ 
     private String topicName; 
     private String groupId; 
     private KafkaConsumer<String,String> kafkaConsumer; 

     public ConsumerThread(String topicName, String groupId){ 
      this.topicName = topicName; 
      this.groupId = groupId; 
     } 
     public void run() { 
      Properties configProperties = new Properties(); 
      configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
      configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
      configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
      configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
      configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple"); 

      //Figure out where to start processing messages from 
      kafkaConsumer = new KafkaConsumer<String, String>(configProperties); 
      kafkaConsumer.subscribe(Arrays.asList(topicName)); 
      //Start processing messages 
      try { 
       while (true) { 
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
        for (ConsumerRecord<String, String> record : records) 
         System.out.println(record.value()); 
       } 
      }catch(WakeupException ex){ 
       System.out.println("Exception caught " + ex.getMessage()); 
      }finally{ 
       kafkaConsumer.close(); 
       System.out.println("After closing KafkaConsumer"); 
      } 
     } 
     public KafkaConsumer<String,String> getKafkaConsumer(){ 
      return this.kafkaConsumer; 
     } 
    } 
} 

毁了使用命令:

java -cp .:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/*:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/KafkaAPIClient-1.0-SNAPSHOT.jar com.spnotes.kafka.simple.Consumer test group1 

错误:

Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V 
    at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59) 
+0

KafkaConsumer.subsribe(java.utils.List)是0.9.0 API。检查您使用的客户端版本以确保使用0.10。*。 – amethystic

回答

2

我使用错误的API感谢amethystic。 增加新的客户端版本后,它开始工作

旧的API是POM

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 

在POM增加了新的API

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 

卡夫卡API新旧版本之间没有向后兼容性