2016-01-06 30 views
2

我试图安装Spark ML管道,但执行程序死亡。 The project is also on GitHub。这是脚本不起作用(有点简化):Spark 1.6.0执行程序因类ClassCastException而死,并导致超时

// Prepare data sets 
logInfo("Getting datasets") 
val emoTrainingData = sqlc.read.parquet("/tw/sentiment/emo/parsed/data.parquet") 
val trainingData = emoTrainingData 

// Configure the pipeline 
val pipeline = new Pipeline().setStages(Array(
    new FeatureReducer().setInputCol("raw_text").setOutputCol("reduced_text"), 
    new StringSanitizer().setInputCol("reduced_text").setOutputCol("text"), 
    new Tokenizer().setInputCol("text").setOutputCol("raw_words"), 
    new StopWordsRemover().setInputCol("raw_words").setOutputCol("words"), 
    new HashingTF().setInputCol("words").setOutputCol("features"), 
    new NaiveBayes().setSmoothing(0.5).setFeaturesCol("features"), 
    new ColumnDropper().setDropColumns("raw_text", "reduced_text", "text", "raw_words", "words", "features") 
)) 

// Fit the pipeline 
logInfo(s"Training model on ${trainingData.count()} rows") 
val model = pipeline.fit(trainingData) 

它执行到最后一行。它打印“xx行上的训练模型”,然后开始拟合,执行程序死亡,驱动程序不会从执行程序收到心跳,并超时,然后脚本退出。它不会超过那条线。

这是杀灭遗嘱执行人的例外:

java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) 
    at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) 
    at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426) 
    at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) 
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) 
    at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) 
    ... 32 more 

其中,后来,导致超时:

ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 142918 ms 

我上传的INFO级日志文件here。 DEBUG日志是〜500MB。

构建文件和相关性似乎是没事:

name := "tweeather" 

version := "1.0.0" 

scalaVersion := "2.11.7" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.0", 
    "org.apache.spark" %% "spark-mllib" % "1.6.0", 
    "org.apache.spark" %% "spark-streaming" % "1.6.0", 
    "org.apache.hadoop" % "hadoop-client" % "2.7.1", 
    "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(), 
    "org.twitter4j" % "twitter4j-stream" % "4.0.4", 
    "org.scalaj" %% "scalaj-http" % "2.0.0", 
    "com.jsuereth" %% "scala-arm" % "1.4", 
    "edu.ucar" % "grib" % "4.6.3" 
) 

dependencyOverrides ++= Set(
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4", 
    "org.scala-lang" % "scala-compiler" % scalaVersion.value, 
    "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", 
    "org.scala-lang.modules" %% "scala-xml" % "1.0.4", 
    "jline" % "jline" % "2.12.1" 
) 

resolvers ++= Seq(
    "Unidata Releases" at "http://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/" 
) 

回答

1

我仍然不知道是什么原因实际上是,但我只用输入数据和它的第三再次运行脚本工作。它没有失败了。根据我的观察,如果我有超过10,000个任务,它就会崩溃。

我最终将我的数据(在另一个脚本中)合并为99个分区。在我再次运行脚本后,它成功计算了一切。

+2

更多的信息在这里:https://issues.apache.org/jira/browse/SPARK-12675 – aluxian

+0

可能是一个内存问题,你的分区使用了大量的内存和Spark无法分配它。看看日志中是否有另外一个参考。 –

0

我有同样的问题,但工作没有崩溃。它抛出了错误,但无论如何它会完成这项工作。所以这看起来像是一个锁定问题。

当我提出配置使用2 proc(本地主机[2])它消失了。所以你可能有更多的任务正在进行,而不是你的流程可以处理的。

相关问题