2016-01-28 32 views
2

我试图做一个传递闭包数据框。几次迭代后,我得到一些内部火花异常。关于什么原因以及如何解决它的任何想法。下面是我的程序:spark sql数据框加入与循环中的重命名

val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19)) 
var edges = e.map(p => Edge(p._1, p._2)).toDF() 
var filtered = edges 
    .filter("start = 1") 
    .distinct() 
    .withColumnRenamed("start", "fStart") 
    .withColumnRenamed("end", "fEnd") 

var i = 0 
while (i < 30) { 
    i = i + 1 
    println("\n i = " + i) 
    filtered = filtered 
    .join(edges, filtered("fEnd") === edges("start")) 
    .select(filtered("fStart"), edges("end")) 
    .withColumnRenamed("start", "fStart") 
    .withColumnRenamed("end", "fEnd").distinct 
    filtered.show 
} 

它需要在顶层中定义一个简单的例子类:

case class Edge(start: Int, end: Int) 

,这里是与异常输出之后,火花挂了一会儿,然后退出时出现错误Executor heartbeat timed out

i = 1 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 4| 
+------+----+ 


i = 2 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 5| 
+------+----+ 


i = 3 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 6| 
+------+----+ 
... 

i = 10 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 13| 
+------+----+ 


i = 11 
16/01/29 00:28:59 ERROR Utils: Uncaught exception in thread driver-heartbeater 
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) 
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) 
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468) 
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) 
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) 
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) 
... 32 more 
+------+----+ 
|fStart|fEnd| 
+------+----+ 
|  1| 14| 
+------+----+ 
... 

PS1。如何在没有列重命名的情况下完成这种连接? PS2。还有一些关于如何使用数据帧的文档? API文档非常简约。

+1

尝试在每次迭代结束时添加缓存:'filtered.cache()'在使用'show'之前 – Niemand

回答

1

这些错误似乎只在集群资源不足以满足请求并且积压正在增加并且一段时间后出现这些类型错误时才会出现。

要解决您的问题,请在filtered.show之前加filtered.cache

第16次迭代后也不会有结果,因为没有匹配的filtered.fEnd === edges.start

+0

确实添加了'filtered.cache'帮助(谢谢)。我知道,在一些迭代之后,程序在没有做任何事情的情况下循环,只是显示这个错误。我不明白的是资源如何缺乏。这是一些非常简单的示例,其中包含一些ROW数据框和几个空白框。此外为什么在这里做缓存帮助? – user2038119

+0

另外,我添加'filtered.cache'并运行循环以获得更多迭代后,我可以看到每次迭代需要更长的时间。这是为什么?一段时间后,他们产生相同的空白数据框。 PS。 @Sumit可以请你指点我的一些文档或youtube视频。 – user2038119

+0

DAG可以指导您确切的原因,但似乎是每次迭代增加时,它也会处理先前的迭代。例如'i = 2',它将处理'i = 1'的数据,然后处理'i = 2'的数据。请记住,Spark坚持需要应用于数据的变换(_Data lineage_),而不是变换的结果/输出。为了保持输出,你特别需要调用'RDD.cache'。 – Sumit