2015-12-13 118 views
1

我是新的火花流和卡夫卡,我不明白这个运行时异常。我已经安装了kafka服务器。Spark Streaming + kafka“JobGenerator”java.lang.NoSuchMethodError

Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.streaming.scheduler.InputInfoTracker.reportInfo(Lorg/apache/spark/streaming/Time;Lorg/apache/spark/streaming/scheduler/StreamInputInfo;)V 
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:166) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) 
at scala.Option.orElse(Option.scala:257) 

,这是我的代码

public class TwitterStreaming { 
    // setup kafka : 
    public static final String ZKQuorum = "localhost:2181"; 
    public static final String ConsumerGroupID = "ingi2145-analytics"; 
    public static final String ListTopics = "newTweet"; 
    public static final String ListBrokers = "localhost:9092"; // I'm not sure about ... 

    @SuppressWarnings("deprecation") 
public static void main(String[] args) throws Exception { 
    // Location of the Spark directory 
    String sparkHome = "usr/local/spark"; 
    // URL of the Spark cluster 
    String sparkUrl = "local[4]"; 
    // Location of the required JAR files 
    String jarFile = "target/analytics-1.0.jar"; 
// Generating spark's streaming context 
JavaStreamingContext jssc = new JavaStreamingContext(
    sparkUrl, "Streaming", new Duration(1000), sparkHome, new String[]{jarFile}); 
// Start kafka stream 
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(ListTopics.split(","))); 
HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
kafkaParams.put("metadata.broker.list", ListBrokers); 

//JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroupID, mapPartitionsPerTopics); 
// Create direct kafka stream with brokers and topics 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    jssc, 
    String.class, 
    String.class, 
    StringDecoder.class, 
    StringDecoder.class, 
    kafkaParams, 
    topicsSet 
); 

// get the json file : 
    JavaDStream<String> json = messages.map(
     new Function<Tuple2<String, String>, String>() { 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
    }); 

本项目的目的是通过使用卡夫卡队列来计算从一个Twitter流10个最好成绩主题标记。代码在没有kakfa的情况下工作。你有什么问题的想法吗?

+2

什么是Spark的版本。 我也有同样的问题,当我使用火花1.4运行1.5版火花。 我切换到版本1.5,没关系。 – giaosudau

+0

谢谢你的帮助:) – afaraut

回答

0

我有同样的问题,它是我使用的火花版本。我使用1.5,然后使用1.4,最终为我工作的版本是1.6。 因此,请确保您使用的Kafka版本与Spark版本兼容。 在我的情况下,我用spark-1.6.0-bin-hadoop2.3来使用Kafka版本2.10-0.10.1.1。

此外,(非常重要)确保您没有在您的日志文件中获取任何禁止的错误。您必须为火花使用的文件夹分配适当的安全授权,否则您可能会收到许多与应用程序本身无关的错误,但安装设置不正确。

相关问题