2016-04-24 49 views
1

我的代码如下:星火:写入错误流文件/ XXX /标准错误产生java.io.IOException:流闭

object WordCount extends App{ 
    val conf = new SparkConf().setAppName("WordCount").setMaster("spark://sparkmaster:7077") 
    val sc = new SparkContext(conf) 
    sc.addJar("/home/found/FromWindows/testsSpark/out/artifacts/untitled_jar/untitled.jar") 
    val file = sc.textFile("hdfs://192.168.1.101:9000/Texts.txt") 
    val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) 
    val res = count.collect() 
    res.foreach(println(_)) 
} 

当我运行它作为本地模式,一切都是OK.However,当在集群中运行它,它会crash.I已经得到了以下错误消息:

。错误消息来自的IntelliJ控制台

16/04/24 19:17:34 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 192.168.1.101): java.lang.ClassNotFoundException: org.klordy.test.WordCount$$anonfun$2 
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
... 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 
16/04/24 19:17:34 INFO TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) on executor 192.168.1.101: java.lang.ClassNotFoundException (org.klordy.test.WordCount$$anonfun$2) [duplicate 1] 
16/04/24 19:17:34 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 2, 192.168.1.101, ANY, 2133 bytes) 
16/04/24 19:17:34 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 3, 192.168.1.101, ANY, 2133 bytes) 
16/04/24 19:17:34 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 3) on executor 192.168.1.101: java.lang.ClassNotFoundException (org.klordy.test.WordCount$$anonfun$2) [duplicate 2] 
16/04/24 19:17:34 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 4, 192.168.1.101, ANY, 2133 bytes) 
16/04/24 19:17:34 ERROR TaskSetManager: Task 1 in stage 0.0 failed 4 times; aborting job 
16/04/24 19:17:34 INFO TaskSchedulerImpl: Cancelling stage 0 
16/04/24 19:17:34 INFO TaskSchedulerImpl: Stage 0 was cancelled 
16/04/24 19:17:34 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:17) failed in 2.904 s 
16/04/24 19:17:34 INFO DAGScheduler: Job 0 failed: collect at WordCount.scala:18, took 3.103414 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 192.168.1.101): java.lang.ClassNotFoundException: org.klordy.test.WordCount$$anonfun$2 
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
... 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 
Caused by: java.lang.ClassNotFoundException: org.klordy.test.WordCount$$anonfun$2 
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
... 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 
16/04/24 19:17:34 INFO SparkContext: Invoking stop() from shutdown hook 
16/04/24 19:17:34 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on executor 192.168.1.101: java.lang.ClassNotFoundException (org.klordy.test.WordCount$$anonfun$2) [duplicate 7] 
16/04/24 19:17:34 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/04/24 19:17:34 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:45350/user/Executor#2034500302]) with ID 0 
16/04/24 19:17:34 INFO SparkUI: Stopped Spark web UI at http://192.168.1.101:4040 
16/04/24 19:17:34 INFO DAGScheduler: Stopping DAGScheduler 
16/04/24 19:17:34 INFO SparkDeploySchedulerBackend: Shutting down all executors 
16/04/24 19:17:34 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 
16/04/24 19:17:34 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 

从火花日志2.message:

16/04/24 19:17:27 INFO Worker: Asked to launch executor app-20160424191727-0005/1 for WordCount 
16/04/24 19:17:27 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message (1.327376 ms) AkkaMessage(LaunchExecutor(spark://sparkworker:7077,app-20160424191727-0005,1,ApplicationDescription(WordCount),2,1024),false) from Actor[akka://sparkWorker/deadLetters] 
16/04/24 19:17:27 INFO SecurityManager: Changing view acls to: root 
16/04/24 19:17:27 INFO SecurityManager: Changing modify acls to: root 
16/04/24 19:17:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 
16/04/24 19:17:27 DEBUG SSLOptions: No SSL protocol specified 
16/04/24 19:17:27 DEBUG SSLOptions: No SSL protocol specified 
16/04/24 19:17:27 DEBUG SSLOptions: No SSL protocol specified 
16/04/24 19:17:27 DEBUG SecurityManager: SSLConfiguration for file server: SSLOptions{enabled=false, keyStore=None, keyStorePassword=None, trustStore=None, trustStorePassword=None, protocol=None, enabledAlgorithms=Set()} 
16/04/24 19:17:27 DEBUG SecurityManager: SSLConfiguration for Akka: SSLOptions{enabled=false, keyStore=None, keyStorePassword=None, trustStore=None, trustStorePassword=None, protocol=None, enabledAlgorithms=Set()} 
16/04/24 19:17:27 INFO ExecutorRunner: Launch command: "/usr/java/jdk1.7.0_51/bin/java" "-cp" "/usr/local/Spark/spark-1.5.1-bin-hadoop2.4/sbin/../conf/:/usr/local/Spark/spark-1.5.1-bin-hadoop2.4/lib/spark-assembly-1.5.1-hadoop2.4.0.jar:/usr/local/Spark/spark-1.5.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/Spark/spark-1.5.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/Spark/spark-1.5.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/Spark/hadoop-2.5.2/etc/hadoop/" "-Xms1024M" "-Xmx1024M" "-Dspark.driver.port=49473" "-XX:MaxPermSize=256m" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "akka.tcp://[email protected]:49473/user/CoarseGrainedScheduler" "--executor-id" "1" "--hostname" "192.168.1.101" "--cores" "2" "--app-id" "app-20160424191727-0005" "--worker-url" "akka.tcp://[email protected]:49838/user/Worker" 
16/04/24 19:17:28 DEBUG FileAppender: Started appending thread 
16/04/24 19:17:28 DEBUG FileAppender: Opened file /usr/local/Spark/spark-1.5.1-bin-hadoop2.4/work/app-20160424191727-0005/1/stdout 
16/04/24 19:17:28 DEBUG FileAppender: Started appending thread 
16/04/24 19:17:28 DEBUG FileAppender: Opened file /usr/local/Spark/spark-1.5.1-bin-hadoop2.4/work/app-20160424191727-0005/1/stderr 
16/04/24 19:17:34 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: AkkaMessage(KillExecutor(spark://sparkworker:7077,app-20160424191727-0005,1),false) 
16/04/24 19:17:34 INFO Worker: Asked to kill executor app-20160424191727-0005/1 
16/04/24 19:17:34 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message (0.172782 ms) AkkaMessage(KillExecutor(spark://sparkworker:7077,app-20160424191727-0005,1),false) from Actor[akka://sparkWorker/deadLetters] 
16/04/24 19:17:34 INFO ExecutorRunner: Runner thread for executor app-20160424191727-0005/1 interrupted 
16/04/24 19:17:34 INFO ExecutorRunner: Killing process! 
16/04/24 19:17:34 ERROR FileAppender: Error writing stream to file /usr/local/Spark/spark-1.5.1-bin-hadoop2.4/work/app-20160424191727-0005/1/stderr 
java.io.IOException: Stream closed 
    at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) 
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:272) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334) 
    at java.io.FilterInputStream.read(FilterInputStream.java:107) 
    at org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) 
    at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) 
    at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) 
    at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) 
    at org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) 
16/04/24 19:17:34 DEBUG FileAppender: Closed file /usr/local/Spark/spark-1.5.1-bin-hadoop2.4/work/app-20160424191727-0005/1/stderr 
16/04/24 19:17:34 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message AkkaMessage(ApplicationFinished(app-20160424191727-0005),false) from Actor[akka://sparkWorker/deadLetters] 
16/04/24 19:17:34 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: AkkaMessage(ApplicationFinished(app-20160424191727-0005),false) 
16/04/24 19:17:34 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message (0.186628 ms) AkkaMessage(ApplicationFinished(app-20160424191727-0005),false) from Actor[akka://sparkWorker/deadLetters] 
16/04/24 19:17:35 DEBUG FileAppender: Closed file /usr/local/Spark/spark-1.5.1-bin-hadoop2.4/work/app-20160424191727-0005/1/stdout 
16/04/24 19:17:35 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message AkkaMessage(ExecutorStateChanged(app-20160424191727-0005,1,KILLED,None,Some(143)),false) from Actor[akka://sparkWorker/deadLetters] 
16/04/24 19:17:35 DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: AkkaMessage(ExecutorStateChanged(app-20160424191727-0005,1,KILLED,None,Some(143)),false) 
16/04/24 19:17:35 INFO Worker: Executor app-20160424191727-0005/1 finished with state KILLED exitStatus 143 
16/04/24 19:17:35 INFO Worker: Cleaning up local directories for application app-20160424191727-0005 

真的很困惑这个,解决这个问题的好主意吗? 我的集群环境是:Spark 1.5.1/hadoop 2.5.2/scala 2.10.4/jdk1.7.0_51。

+0

你确定'sc.addJar(“/ home/found/FromWindows/testsSpark/out/artifacts/untitled_jar/untitled.jar”)'是文件的正确位置吗?您可以在Spark日志中看到它是否能够拿起驱动程序并将其发送给工作人员。 –

+0

我相信这个位置是正确的。当调用RDD的collect()在'val res = count.collect()'时,代码崩溃了。 – KLordy

+0

那是因为RDD正在实现的时候。 –

回答

0

问题已解决。主要原因是我的项目导入了两个版本的scala(sbt下载的一个额外版本)。感谢帮助我解决这个问题的人。

0

我的猜测是sc.addJar("/home/found/FromWindows/testsSpark/out/artifacts/untitled_jar/untitled.jar")在执行程序上失败,因为该执行程序节点上不存在该路径/ jar。尝试将jar放入HDFS并使用HDFS路径来指定它,以便您的执行者可以访问它。

甚至更​​好,使用--jars COMAND行选项​​并指定jar那里,然后你没有当您更改jar改变你的代码和recomple。

+0

你是什么意思Hadoop路径?我只是试图把我的整个项目移动到'/usr/local/Spark/untitled'.And这仍然不起作用。 – KLordy

+0

对不起,我应该说HDFS不是Hadoop更清晰 - 编辑我的答案。如果你没有HDFS,那么你必须把jar文件放在每个执行节点上的完全相同的位置,或者像我说的 - 只需使用'--jars'和'spark-submit' –

+0

我刚才试过这种方式,看起来不行.TT – KLordy