2016-11-16 71 views
0

非常缓慢加入我做了一个公共列连接两个dataframes,然后跑了表演方法:从丢失,火花

df= df1.join(df2, df1.col1== df2.col2, 'inner') 
    df.show() 

然后加入跑得很慢,最后引发错误:从丢失。

Py4JJavaError: An error occurred while calling o109.showString. 

    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 : ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost 

驱动程序堆栈跟踪: 在

org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)

一些搜索后,似乎这是一个内存相关的问题。然后,我增加了重新分配到3000,增加执行人的记忆,增加了记忆力,但仍然没有运气,我得到了同样的奴隶失去的错误。在df.show()期间,我发现其中一个执行器shuffle的写入大小非常高,其他的不是很高。 任何线索? 由于使用Scala的尝试

+0

声音就像数据可能会倾斜 - 两个数据帧中分别有多少行?你还在运行什么类型的实例类型以及分配多少内存?你可以尝试在'join'之后做'count'而不是'show'吗? –

+0

@GlennieHellesSindholt是计数经历了。一个数据帧比另一个数据帧大100倍。较小的DF约为6M。我在EC2上使用Spark 1.6。 – newleaf

+0

我怀疑如此。我在猜测,如果你做了一个'df = df1.join(df2,df1.col1 == df2.col2,'inner')。persist(StorageLevel.MEMORY_AND_DISK)''后跟一个'df.count',后面跟着一个'df.show',它可能也会通过,对吧? –

回答

1

如果

val df = df1.join(df2,Seq("column name")) 

如果pyspark

df = df1.join(df2,["columnname"]) 

df = df1.join(df2,df1.columnname == df2.columnname) 
display(df) 

如果试图做同样的pyspark - SQL

df1.createOrReplaceTempView("left_test_table") 
df2..createOrReplaceTempView("right_test_table") 
left <- sql(sqlContext, "SELECT * FROM left_test_table") 
right <- sql(sqlContext, "SELECT * FROM right_test_table") 

head(drop(join(left, right), left$name))