2016-08-17 71 views
3

我们试图提交一个spark工作(spark 2.0,hadoop 2.7.2),但由于某种原因,我们在EMR中收到了一个相当神秘的NPE。作为一个scala程序,一切都运行得很好,所以我们不确定是什么原因造成了这个问题。这里的堆栈跟踪:作为spark工作提交时,Spark RDD映射中的NullPointerException

18:02:55271 ERROR utils的:91 - 中止任务 显示java.lang.NullPointerException 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $(来源不明) 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(来源不明) 在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache。 spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution .datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasource s .InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor。 runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

据我们所知,这种情况发生在以下方法中:

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.map(row => 
     "text|label" 
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

我们已经收窄,到地图的功能,因为这时候的火花作业提交作品:

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

没有人有任何想法可能会造成这个问题?另外,我们如何解决它?我们很难过。

+0

你没试过'coalesce()'吗? – gsamaras

+0

@gsamaras不!但它似乎没有合并就行。这里发生了什么? – cscan

回答

5

我想你会得到一个NullPointerException当工作人员试图访问只存在于驱动程序而不是工作者的SparkContext对象时抛出的。

coalesce()重新分区您的数据。当你只请求一个分区时,它会试图挤压全部数据在一个分区*。这可能会给应用程序的内存足够大带来压力。

一般来说,最好不要只将分区缩小到1。

欲了解更多,请阅读:Spark NullPointerException with saveAsTextFilethis


+0

我们使用coalesce(1)的原因是将所有数据写入单个文件而不是多个文件。有没有其他方法可以实现这一点? – cscan

+0

@cscan no。也许增加你的内存设置可以让你的应用程序使用1分区,但是我发布的错误并不表示这样的事情。为什么你希望他们在1个文件中有一个原因吗? – gsamaras

+1

当我们只用五条记录进行测试时发生了这个错误 - 我不认为它与内存使用有关。 – cscan