2016-11-02 66 views
1

我的Spark流作业(spark 1.6.1,kafka 0.9.0)正在使用20个分区的Kafka Topic消耗。 在oracle数据库中正在维护偏移量。使用ArrayBuffer的Spark Streaming作业失败(kafka.common.NotLeaderForPartitionException)

在作业启动时,我会读取Oracle的偏移量(只读一次),并在处理后向oracle写入偏移量。

我的工作成功运行了8小时,然后失败,原因如下。在故障期间,kafka主题,spark程序和oracle代码没有变化。

任何人都可以告诉我为什么我正在运行火花流作业上发生此错误?

16/11/02 08:09:21 ERROR JobScheduler: Error generating jobs for time 1478074160000 ms 
org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([MyTopic,11])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) 
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) 
Caused by: org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([MyTopic,11])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
16/11/02 08:09:21 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 
16/11/02 08:09:21 INFO JobGenerator: Stopping JobGenerator immediately 
16/11/02 08:09:21 INFO RecurringTimer: Stopped timer for JobGenerator after time 1478074160000 
16/11/02 08:09:21 INFO JobGenerator: Stopped JobGenerator 
16/11/02 08:09:21 INFO JobScheduler: Stopped JobScheduler 
16/11/02 08:09:21 INFO StreamingContext: StreamingContext stopped successfully 
16/11/02 08:09:21 INFO SparkContext: Invoking stop() from shutdown hook 
16/11/02 08:09:21 INFO SparkUI: Stopped Spark web UI at http://10.251.228.103:4040 
16/11/02 08:09:21 INFO SparkDeploySchedulerBackend: Shutting down all executors 
16/11/02 08:09:21 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 
16/11/02 08:09:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/11/02 08:09:21 INFO MemoryStore: MemoryStore cleared 
16/11/02 08:09:21 INFO BlockManager: BlockManager stopped 
16/11/02 08:09:21 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/11/02 08:09:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/11/02 08:09:21 INFO SparkContext: Successfully stopped SparkContext 
16/11/02 08:09:21 INFO ShutdownHookManager: Shutdown hook called 
16/11/02 08:09:21 INFO ShutdownHookManager: Deleting directory /app/spark/spark-1.6.1-bin-hadoop2.6/local/spark-30fb329c-3ccf-4d8c-a06c-2d36e6f968b3/httpd-f81472a2-3262-4eea-8d64-7ff96d2ef3e5 
16/11/02 08:09:21 INFO ShutdownHookManager: Deleting directory /app/spark/spark-1.6.1-bin-hadoop2.6/local/spark-30fb329c-3ccf-4d8c-a06c-2d36e6f968b3 
+1

试试看卡夫卡/ ZooKeeper中的日志。检查一切正常,运行正常并且磁盘未满。 –

回答

0

对我来说,问题只不过是我的Kafka服务器被中断了。易于启动备份:

./bin/kafka-server-start.sh -daemon config/server.properties 
+0

尤其是如果你在外部存储主题/偏移量信息,似乎** Set([MyTopic,11]))**来自你的oracle数据库,但Kafka无法找到匹配的主题和偏移量。 – sbrannon