我们试图提交一个spark工作(spark 2.0,hadoop 2.7.2),但由于某种原因,我们在EMR中收到了一个相当神秘的NPE。作为一个scala程序,一切都运行得很好,所以我们不确定是什么原因造成了这个问题。这里的堆栈跟踪:作为spark工作提交时,Spark RDD映射中的NullPointerException
18:02:55271 ERROR utils的:91 - 中止任务 显示java.lang.NullPointerException 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $(来源不明) 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(来源不明) 在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache。 spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution .datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasource s .InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor。 runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)
据我们所知,这种情况发生在以下方法中:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
我们已经收窄,到地图的功能,因为这时候的火花作业提交作品:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
没有人有任何想法可能会造成这个问题?另外,我们如何解决它?我们很难过。
你没试过'coalesce()'吗? – gsamaras
@gsamaras不!但它似乎没有合并就行。这里发生了什么? – cscan