3

我试图在本地模式下运行我的spark应用程序。为了设置它,我遵循本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(在法语 )显示构建当地kafka/zookeeper环境的每个步骤。本地Kafka应用程序失败:NoSuchMethodError:createEphemeral

而且,我用IntelliJ具有以下配置:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]") 

而且我跑的配置,为消费者:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1" 

而对于生产者:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300 

事前,我检查了消费者是否从默认生产者那里收到了生产者的输入console-producerconsole-consumerkafka_2.10-0.9.0.1;它确实如此。

不过,我面临着以下错误:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V 
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921) 
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348) 
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) 
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636) 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) 
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) 
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254) 
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156) 
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

我没有在解决这一成功。我认为这是一个zookeeper -config错误,但是在与具有相同配置文件的另一台计算机上的应用程序的工作版本进行比较之后,它似乎不再是了。

回答

4

看起来你在这里有一个依赖性问题。

检查我们的类路径中的com.101tec.zkclient库的版本。卡夫卡需要版本0.7

此外,由于kafka_2.10-0.9.0.1生产者和消费者的API不再使用zookeeper。 Spark-streaming似乎在你的情况下使用了0.8版的Kafka。

+0

答案很简单,但很难弄清楚。这解决了我的问题。万分感谢。 – wipman

相关问题