2017-02-05 78 views
0

目标:我试图在[从S3读取]在地板文件上运行查询,然后在另一个S3存储桶中将它作为单个制表符分隔的文本文件写出。所有这些都是在亚马逊上的EMR集群上运行的Spark应用中完成的。Spark作业在Amazon EMR上给出NullPointerException

我已经阅读了StackOverflow上的其他类似问题,但无济于事。

//Read parquet file 
final Dataset<Row> df = spark.read().parquet(fileName) 
//Register temp table for convinience 
df.registerTempTable(tableName) 

//Valid SQL query string - Verified using data bricks on the same parquet file 
final String queryString = //SQL String 

//Result of running sql query on parquet file 
final Dataset<Row> result = spark.sql(queryString) 

//Check if result is null - result is NOT NULL 

final JavaRDD<Row> rowJavaRDD = result.toJavaRDD() 

//Check if rowJavaRDD is null - rowJavaRDD is NOT NULL 

//Coalesce and write to a text file 
rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath) 

但我正在逐渐上线rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)

我一直在使用broadcast也尝试了NPE,但没有做任何事情。尝试没有融合,但仍然得到相同的错误。

我已经检查了savePath是有效的,不存在S3权限问题

我试图做同样的地方在./spark-shell使用同一拼花文件Scala和它工作得很好:/

运行火花2.0.2在EMR集群上。 [版本2.1给出一个classCastException在别的东西 - 所以更新是一个更大的麻烦]

任何帮助,将不胜感激。

堆栈跟踪如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 38.0 failed 4 times, most recent failure: Lost task 
7.3 in stage 38.0 (TID 13586, ip-10-30-1-150.ec2.internal): java.lang.NullPointerException 
     at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:231) 
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1203) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 
     at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
     at org.apache.spark.scheduler.Task.run(Task.scala:86) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
     at scala.Option.foreach(Option.scala:257) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438) 
     at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:549) 
     at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) 

相关链接

1)NPE saveAsTextFile

2)RDD as textfile

回答

0

正如你提供自己的第一个相关链接提到(NPE saveAsTextFile ),这是在您调用saveAsTex时发生的事实tFile()并不意味着该问题与该行有任何关系,因为到目前为止所做的所有转换都是延迟执行的。

快速Google搜索“OnHeapColumnVector NullPointerException”为我打开了SPARK-16518。看起来这是一个自2.0.0以来一直存在的问题,并且仍然没有解决。

+0

我在其他地板文件上也使用了相同的命令,它工作正常。只是在这个文件中,我得到了NPE。 我在本地下载了麻烦的镶木地板文件,并尝试了相同的命令 - 并且它正常工作:// –

0

它看起来像org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt命中空列(由于错误)。检查是否在您的parquet文件的模式中找到int32。请参阅此处的另一个版本:https://issues.apache.org/jira/browse/HIVE-14294

此外,共享实木复合地板文件的架构。

相关问题