2016-01-05 63 views
0

我有一个问题与Apache的火花。 我使用yarn-client模式工作。Pyspark无法连接到执行器

这是我的配置:

conf.set("spark.executor.memory", "4g") 
conf.set("spark.driver.memory", "4g") 
conf.set("spark.driver.cores", "4") 
conf.set("spark.default.parallelism", "3") 
conf.set("spark.executor.cores", "2") 
conf.set("spark.num.executor", "8") 
conf.set("spark.shuffle.io.maxRetries", "20") 

这是我的代码:

我有2个dataframes,DF和other_df。 我第一次内部加入2个数据框来获取基于ID的过滤后的数据。 然后我汇总计算每月CA的平均值(df_agg)。 然后我想收集()或拿(12)绘制图形。当我收集发生 错误()

#--- inner join to select the right segment ---# 
new_df = (other_df 
    .join(df, df.ID == other_df.ID 
     ) 
) 
#--- aggregate the mean per month ---# 
df_agg = (new_df 
      .groupBy("month") 
      .avg('CA') 
     ) 
#--- collect() ---# 
data = df_agg.collect() 
x, y = zip(*data) 

我得到了星火此错误消息:

Py4JJavaError: An error occurred while calling o824.javaToPython. 
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: 
..... 

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) 
    at org.apache.spark.sql.DataFrame.javaToPython(DataFrame.scala:1582) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: 
...... 
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) 
    ... 27 more 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69) 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) 
    ... 35 more 

,这些都是在星火UI中的错误消息:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor> +details 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to <executor> 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 

我有阅读错误日志时遇到困难。 我试图增加执行程序的数量(最多8个),增加shuffle.io.maxRetries(最多20个)。我也读了this post,这很有用,但我很难理解要调整的参数。

尤其是因为有时作业成功,下一次失败......

我再改配置有:

conf.set("spark.yarn.executor.memoryOverhead", "600") 

但它也不能工作。

有没有人有关于这个问题的线索?

谢谢

回答

0

您可以设置spark.sql.broadcastTimeout比300S的默认的值。

+0

谢谢。你介意详细说明你的答案吗?我在火花文档中看不到spark.sql.boradcastTimeout ... – shad

相关问题