2017-05-03 61 views
1

我正在运行火花流应用程序中的序列化错误。下面是我的驱动程序代码:火花流串行化错误

package com.test 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.json.JSONObject; 
import java.io.Serializable 


object SparkFiller extends Serializable{ 
def main(args: Array[String]): Unit ={ 
val sparkConf = new 
SparkConf().setAppName("SparkFiller").setMaster("local[*]") 
// println("test") 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
sparkConf.registerKryoClasses(Array(classOf[firehoseToDocumentDB])) 
sparkConf.registerKryoClasses(Array(classOf[PushToDocumentDB])) 
var TimeStamp_Start = 1493836050 
val TimeStamp_Final = 1493836056 
var timeStamp_temp = TimeStamp_Start - 5; 
// val send_timestamps = new firehoseToDocumentDB(TimeStamp_Start,TimeStamp_Final); 
// send_timestamps.onStart(); 
val ssc = new StreamingContext(sparkConf, Seconds(5)) 
val lines = ssc.receiverStream(
new firehoseToDocumentDB(TimeStamp_Start.toString(),TimeStamp_Final.toString())) 
// val timestamp_stream = ssc.receiverStream(new firehoseToDocumentDB(TimeStamp_Start.toString(),TimeStamp_Final.toString())) 
lines.foreachRDD(rdd => { 
    rdd.foreachPartition(part => { 
    val dbsender = new PushToDocumentDB(); 
    part.foreach(msg =>{ 
     var jsonobject = new JSONObject(part) 
     var temp_pitr = jsonobject.getString("pitr") 
     println(temp_pitr) 
     if (TimeStamp_Final >= temp_pitr.toLong) { 
     ssc.stop() 
     } 
     dbsender.PushFirehoseMessagesToDocumentDb(msg) 
    }) 
    // dbsender.close() 
    }) 
}) 

println("line",line))) 
println("ankush") 
ssc.start() 
ssc.awaitTermination() 
} 

} 

当我下面的行添加到代码

 var jsonobject = new JSONObject(part) 
     var temp_pitr = jsonobject.getString("pitr") 
     println(temp_pitr) 
     if (TimeStamp_Final >= temp_pitr.toLong) { 
     ssc.stop() 
     } 

我得到一个错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
at com.boeing.SparkFiller$$anonfun$main$1.apply(SparkFiller.scala:26) 
at com.boeing.SparkFiller$$anonfun$main$1.apply(SparkFiller.scala:25) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.streaming.StreamingContext, value: 
[email protected]) 
- field (class: com.boeing.SparkFiller$$anonfun$main$1, name: ssc$1, type: 
class org.apache.spark.streaming.StreamingContext) 
- object (class com.boeing.SparkFiller$$anonfun$main$1, <function1>) 
- field (class: com.boeing.SparkFiller$$anonfun$main$1$$anonfun$apply$1, 
name: $outer, type: class com.boeing.SparkFiller$$anonfun$main$1) 
- object (class com.boeing.SparkFiller$$anonfun$main$1$$anonfun$apply$1, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
... 30 more 

Process finished with exit code 1 

如果我删除这些代码行是运作良好。

问题是因为在rdd中使用了ssc.stop()。如果满足条件,是否还有其他方法可以从rdd调用关闭钩子。

+3

你不能在'foreachRDD'内执行'ssc.stop()',你应该等待完成。 – freedev

+0

为什么不在'ssc.awaitTermination()'之后添加'ssc.stop()'? – freedev

+0

@freedev我保留的原因是我想停止火花执行,如果它满足if条件。 –

回答

0

Issue is because of using the ssc.stop() in the rdd.

你是对的!任何Spark上下文都不可序列化,并且不能在任何任务中使用。

is there any otherway that I can call a shutdown hook from the rdd if it satisfies the condition.

为了控制您的流媒体应用程序的生命周期,你应该考虑覆盖的监听器和停止根据你的病情的背景。我做了足够的研究,发现这是唯一可行的解​​决方案。

请参阅我对this post的回答,了解如何根据特定条件停止流式传输应用程序。