2017-04-10 20 views
1

我最近开始触发流式处理并实现检查点。我将检查点存储在HDFS中。当流式传输失败时,它能够返回到最后一个检查点,但获得NullPointerException并且流式作业正在被终止。我能够看到HDFS中的检查点。不知道为什么即使HDFS中有chckpoint,我也会得到异常。任何投入都将有所帮助。为什么从检查点重新启动时流式作业会失败并返回NullPointerException?

17/04/10 16:30:47 INFO JobGenerator: Batches during down time (2 batches):1491841680000 ms, 1491841800000 ms 
17/04/10 16:30:47 INFO JobGenerator: Batches pending processing (0 batches): 
17/04/10 16:30:47 INFO JobGenerator: Batches to reschedule (2 batches): 1491841680000 ms, 1491841800000 ms 
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841680000 ms 
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms 
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34 
17/04/10 16:30:48 INFO DAGScheduler: Got job 0 (isEmpty at piadj.scala:34) with 1 output partitions 
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 0 (isEmpty at piadj.scala:34) 
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List() 
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List() 
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32), which has no missing parents 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 4.1 KB) 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 6.1 KB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB) 
17/04/10 16:30:48 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1008 
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32) 
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, oser402370.wal-mart.com, partition 0,ANY, 2108 bytes) 
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841800000 ms 
17/04/10 16:30:48 INFO RecurringTimer: Started timer for JobGenerator at time 1491841920000 
17/04/10 16:30:48 INFO JobGenerator: Restarted JobGenerator at 1491841920000 ms 
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841800000 ms.0 from job set of time 1491841800000 ms 
17/04/10 16:30:48 INFO JobScheduler: Started JobScheduler 
17/04/10 16:30:48 INFO StreamingContext: StreamingContext started 
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34 
17/04/10 16:30:48 INFO DAGScheduler: Got job 1 (isEmpty at piadj.scala:34) with 1 output partitions 
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at piadj.scala:34) 
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List() 
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List() 
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32), which has no missing parents 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 10.2 KB) 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 12.3 KB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB) 
17/04/10 16:30:48 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32) 
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks 
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, XXXXXXX, partition 0,ANY, 2108 bytes) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB) 
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1142 ms on XXXXXXX (1/1) 
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 0 (isEmpty at piadj.scala:34) finished in 1.151 s 
17/04/10 16:30:49 INFO DAGScheduler: Job 0 finished: isEmpty at piadj.scala:34, took 1.466449 s 
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 979 ms on XXXXXXX (1/1) 
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 1 (isEmpty at piadj.scala:34) finished in 0.983 s 
17/04/10 16:30:49 INFO DAGScheduler: Job 1 finished: isEmpty at piadj.scala:34, took 1.006658 s 
17/04/10 16:30:49 INFO JobScheduler: Finished job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms 
17/04/10 16:30:49 INFO JobScheduler: Total delay: 169.575 s for time 1491841680000 ms (execution: 1.568 s) 
17/04/10 16:30:49 ERROR JobScheduler: Error running job streaming job 1491841680000 ms.0 
java.lang.NullPointerException 
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558) 
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362) 
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623) 
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:39) 
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:33) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

下面是我的代码

def createContext(brokers:String,topics:String,checkpointDirectory:String):StreamingContext={  
val sparkConf = new SparkConf().setAppName("pi") 
val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
sqlContext.setConf("hive.exec.dynamic.partition", "true") 
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
val ssc = new StreamingContext(sc, Seconds(1)) 
ssc.checkpoint(checkpointDirectory) 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet) 
val lines = messages.map(_._2) 
lines.foreachRDD { rdd => 
    if(!rdd.isEmpty()) { 
    import sqlContext.implicits._ 
    val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))   
    val rdd3 = rdd2.map(x => new String(x,"UTF-8")) 
    val df1 = sqlContext.read.json(rdd3) 
    /*some other transformations and inserting into hive*/ 
} 
} 
ssc 
} 
def main(args: Array[String]) { 
if (args.length < 3) { 
    System.err.println("Usage: streaming <brokers> <topics> <checkpointDirectory>") 
    System.exit(1) 
} 
val Array(brokers,topics,checkpointDirectory) = args 
val ssc = StreamingContext.getOrCreate(checkpointDirectory,()=>createContext(brokers,topics,checkpointDirectory)) 
ssc.start() 
ssc.awaitTermination() 
} 
+0

您是否发现此问题的任何解决方案?你能分享这个解决方案吗? – yoga

回答

0

TL;博士移动代码来创建一个卡夫卡DSTREAM和foreachcreateContextmain使用它。

根据the scaladoc of StreamingContext

无论是重新创建检查点数据的StreamingContext或创建一个新的StreamingContext。如果提供的checkpointPath中存在检查点数据,则将从检查点数据重新创建StreamingContext。如果数据不存在,那么将通过调用所提供的createFunc来创建StreamingContext。

虽然它可能没有说清楚,creatingFunc创建StreamingContext应该只创建一个新的StreamingContext可能与检查站启用。 没有别的。

你应该将代码来创建卡夫卡DSTREAM和foreachRDDcreateContext,并把它作为main部分(ssc初始化后马上开始之前)。

+0

我是否应该将检查点移动到'createContext'之外? 'createContext'里面应该有什么? – pharpan

+0

我会在'createContext'外部执行此操作,但也会检查它在内部时发生的情况。我认为两者都应该工作。 –

+0

@JacekLaskowski您推荐在'creatingFunc'之外定义DStreams及其关联的操作(即foreachRDD)。当流应用程序从故障中恢复时,不会恢复先前创建的DStream操作,因此无需再次声明它们? – autodidacticon

相关问题