当我尝试提交火花数据流作业,谷歌dataproc集群中,我得到这个异常:星火流上dataproc抛出FileNotFoundException异常
16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
全输出here。
看来当Hadoop配置不正确spark-env.sh定义发生这个错误 - link1,link2
是它配置的地方吗?任何关于如何解决它的指针?
运行在本地模式相同的代码工作正常:
sparkConf.setMaster("local[4]")
有关其他方面:作业已调用是这样的:
gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
这是样板设置代码:
lazy val conf = {
val c = new SparkConf().setAppName(this.getClass.getName)
c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)
if (isLocal) c.setMaster("local[4]")
c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
c.set("spark.streaming.blockInterval", "1s")
}
lazy val ssc = if (checkPointingEnabled) {
StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
} else {
createStreamingContext()
}
private def getCheckPointDirectory: String = {
if (isLocal) localCheckPointPath else checkPointPath
}
private def createStreamingContext(): StreamingContext = {
val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
s.checkpoint(getCheckPointDirectory)
s
}
在此先感谢
谢谢丹尼斯。清理检查点工作。因此,总而言之,要从检查点恢复:1.将jar添加到本地文件系统,并在spark作业提交中引用它; 2.在从检查点恢复之前,重新启动dataproc代理,否则会抛出各种异常(https:/ /gist.github.com/rvenkatesh25/a36e6f339febbf0a6d3b96ce5ad08fdc)。这听起来是对的吗? –
为了更清楚地说明,您只需要在第一次从给定检查点恢复时重新启动数据程序代理,所有后续时间都将无需重新启动。在Dataproc中处理这个更干净的工作正在进行中,所以当然需要重新启动代理并不理想。基本上,您第一次在没有检查点的情况下运行作业时,Dataproc的“jobid”会被烧入检查点文件中;所有后续的检查点恢复运行将重新使用该旧的jobid。因此,单次重新启动会使代理忘记第一次作业运行中的jobid。 –
即使重新启动dataproc代理技巧,我也会得到以下异常:“17/05/16 17:39:04 ERROR org.apache.spark.SparkContext:初始化SparkContext时出错。 org.apache.spark.SparkException:Yarn应用程序有已经结束!它可能已经被杀死或无法启动应用程序主控器。 \t at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)“在dataproc端有什么变化吗? –