2014-10-20 31 views
1

我尝试使用DISK_ONLY持久性在大型数据集(〜100GB)上运行MLlib:https://spark.apache.org/docs/1.0.0/mllib-optimization.html#limited-memory-bfgs-l-bfgs的LBFGS示例。我使用16GB的驱动程序和16GB的执行程序。添加执行程序时出现Spark OutOfMemoryError

当我使用少量执行器(10)时,一切都运行平稳。但是当我尝试使用更多的执行者时(40),我在驱动程序上获得了OutOfMemoryError: Java heap space。我认为这可能与使用的并行性级别有关(如https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism所示)。

我试图将spark.default.parallelism设置为大(从5000到15000),但我仍然有同样的问题,似乎没有考虑到(每个作业大约有500个任务),即使它是在环境选项卡中设置。

我在一个Yarn集群上使用Spark 1.0.0和Java。我使用 SparkConf conf = new SparkConf().set("spark.default.parallelism", "15000");来设置默认并行度。

堆栈跟踪:

14/10/20 11:25:16 INFO TaskSetManager: Starting task 30.0:20 as TID 60630 on executor 17: a4-5d-36-fc-ef-54.hpc.criteo.preprod (PROCESS_LOCAL) 
14/10/20 11:25:16 INFO TaskSetManager: Serialized task 30.0:20 as 127544326 bytes in 227 ms 
14/10/20 11:25:59 INFO TaskSetManager: Starting task 30.0:68 as TID 60631 on executor 10: a4-5d-36-fc-9f-2c.hpc.criteo.preprod (PROCESS_LOCAL) 
14/10/20 11:25:59 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-5] shutting down ActorSystem [spark] 
java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOf(Arrays.java:2271) 
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) 
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) 
    at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) 
    at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) 
    at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) 
    at org.apache.spark.util.SerializableBuffer.writeObject(SerializableBuffer.scala:49) 
    at sun.reflect.GeneratedMethodAccessor98.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:145) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:143) 
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.launchTasks(CoarseGrainedSchedulerBackend.scala:143) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:131) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
14/10/20 11:25:59 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201 
14/10/20 11:25:59 INFO ApplicationMaster: finishApplicationMaster with FAILED 
14/10/20 11:25:59 INFO AMRMClientImpl: Waiting for application to be successfully unregistered. 
Exception in thread "Thread-4" java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187) 
Caused by: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) 
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215) 
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) 
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) 
    at akka.actor.ActorCell.terminate(ActorCell.scala:338) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:218) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

的为什么这个错误发生的事情,我怎么能解决这个问题的任何想法?

+0

可提供有关群集规范(例如2x100GB服务器)的更多信息以及更详细的异常堆栈跟踪。另外,从你所描述的你正在启动40x16GB的执行者。内存是否可用? – 2014-10-20 13:54:39

+0

我编辑了我的帖子。我不完全知道集群的配置,但我可以访问40x16GB。 – Belag 2014-10-20 14:11:13

回答

相关问题