1
我是新的火花流和卡夫卡,我不明白这个运行时异常。我已经安装了kafka服务器。Spark Streaming + kafka“JobGenerator”java.lang.NoSuchMethodError
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.streaming.scheduler.InputInfoTracker.reportInfo(Lorg/apache/spark/streaming/Time;Lorg/apache/spark/streaming/scheduler/StreamInputInfo;)V
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:166)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
,这是我的代码
public class TwitterStreaming {
// setup kafka :
public static final String ZKQuorum = "localhost:2181";
public static final String ConsumerGroupID = "ingi2145-analytics";
public static final String ListTopics = "newTweet";
public static final String ListBrokers = "localhost:9092"; // I'm not sure about ...
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
// Location of the Spark directory
String sparkHome = "usr/local/spark";
// URL of the Spark cluster
String sparkUrl = "local[4]";
// Location of the required JAR files
String jarFile = "target/analytics-1.0.jar";
// Generating spark's streaming context
JavaStreamingContext jssc = new JavaStreamingContext(
sparkUrl, "Streaming", new Duration(1000), sparkHome, new String[]{jarFile});
// Start kafka stream
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(ListTopics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", ListBrokers);
//JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroupID, mapPartitionsPerTopics);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// get the json file :
JavaDStream<String> json = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
本项目的目的是通过使用卡夫卡队列来计算从一个Twitter流10个最好成绩主题标记。代码在没有kakfa的情况下工作。你有什么问题的想法吗?
什么是Spark的版本。 我也有同样的问题,当我使用火花1.4运行1.5版火花。 我切换到版本1.5,没关系。 – giaosudau
谢谢你的帮助:) – afaraut