2016-07-11 21 views
2

我试图从数据库中查询数据,对它做一些转换并在hdfs上以parquet格式保存新数据。当写出火花数据帧到实木复合格式时出现内存不足错误

由于数据库查询返回大量行,因此我正在批量获取数据并在每个传入批处理中运行上述过程。

更新2:批量处理逻辑是:

import scala.collection.JavaConverters._ 

import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, StringType} 

class Batch(rows: List[String], 
      sqlContext: SQLContext) { 

     // The actual schema has around 60 fields 
     val schema = Array("name", "age", "address").map(field => 
         StructField(field, StringType, true) 
        ) 

     val transformedRows = rows.map(rows => { 

       // transformation logic (returns Array[Array[String]] type) 

      }).map(row => Row.fromSeq(row.toSeq)) 

     val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema) 

} 

val sparkConf = new sparkConf().setAppName("Spark App") 
val sparkContext = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sparkContext) 

// Code to query database 
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next 

var batch_num = 0 

while (queryResponse.hasNext) { 
    val batch = queryResponse.next 

    val batchToSave = new Batch(
          batch.toList.map(_.getDocument.toString), 
          sqlContext) 

    batchToSave.dataframe.write.parquet(batch_num + "_Parquet") 

    batch_num += 1 

} 

在1.6.1我的火花版本和火花提交是:

spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar 

的问题是,经过一定的批次数,我得到java.lang.OutOfMemoryError错误。

整个堆栈跟踪是:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at scala.StringContext.standardInterpolator(StringContext.scala:125) 
    at scala.StringContext.s(StringContext.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

我尝试将数据合并到一个单一的分区,但没有任何区别。

dataframe.coalesce(1).write.parquet(batch_num + "_Parquet") 

任何帮助,将不胜感激。

更新1

如果不这样做的一个RDD变换COALESCE仍然给出了一个错误,但堆栈跟踪如下。似乎是Parquet的问题。

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90) 
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229) 
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131) 
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178) 
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203) 
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56) 
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183) 
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99) 
    at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+1

否否否否否否不合并 - 这与您想要做的相反。 – GameOfThrows

+0

[如何处理“java.lang.OutOfMemoryError:Java堆空间”错误(64MB堆大小)](http://stackoverflow.com/questions/37335/how-to-deal-with-java- lang-outofmemoryerror -java-heap-space-error-64mb-heap) –

+0

如果你的数据(跨所有分区,因为你正在合并数据)大于可用内存,你将拥有OOM。通常情况下,合并到单个分区是一种不好的做法,因为它占用了您的大部分可扩展性。 –

回答

2

今天也有同样的问题。原来我的执行计划相当复杂,并且toString生成了150 MB的信息,与Scala的字符串插值相结合导致驱动程序内存不足。

您可以尝试增加驱动程序的内存(我必须将其从8 GB加倍到16 GB)。

相关问题