2016-12-13 79 views
0

当我尝试提交火花数据流作业,谷歌dataproc集群中,我得到这个异常:星火流上dataproc抛出FileNotFoundException异常

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext. 
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist 
     at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) 
     at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) 
... 
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040} 
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! 
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main 
java.lang.NullPointerException 
     at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152) 
     at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360) 
... 
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist 
     at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) 
     at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) 

全输出here

看来当Hadoop配置不正确spark-env.sh定义发生这个错误 - link1link2

是它配置的地方吗?任何关于如何解决它的指针?

运行在本地模式相同的代码工作正常:

sparkConf.setMaster("local[4]") 

有关其他方面:作业已调用是这样的:

gcloud dataproc jobs submit spark \ 
--cluster my-test-cluster \ 
--class com.company.skyfall.Skyfall \ 
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \ 
--properties spark.ui.showConsoleProgress=false 

这是样板设置代码:

lazy val conf = { 
    val c = new SparkConf().setAppName(this.getClass.getName) 
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString) 

    if (isLocal) c.setMaster("local[4]") 
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true") 
    c.set("spark.streaming.blockInterval", "1s") 
    } 

    lazy val ssc = if (checkPointingEnabled) { 
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext) 
    } else { 
    createStreamingContext() 
    } 

    private def getCheckPointDirectory: String = { 
    if (isLocal) localCheckPointPath else checkPointPath 
    } 

    private def createStreamingContext(): StreamingContext = { 
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds)) 
    s.checkpoint(getCheckPointDirectory) 
    s 
    } 

在此先感谢

回答

1

这是否有可能不是第一次使用给定的检查点目录运行作业,因为检查点目录中已经包含检查点了?

这是因为检查点硬编码的确切jar文件参数用于提交纱线的应用,并与--jars标志指向GCS上Dataproc运行时,这是Dataproc自动从GCS举办的jar文件到实际的语法糖本地文件路径/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar仅在单个作业运行期间暂时使用,因为Spark无法直接从GCS调用jar文件而无需在本地进行暂存。

但是,在后续作业中,以前的tmp jarfile已被删除,但新作业试图将旧位置硬编码到检查点数据中。

由于检查点数据中的硬编码导致还有其他问题;例如,Dataproc也使用YARN“标签”来跟踪作业,并且如果旧的Dataproc作业的“标签”在新的YARN应用程序中重复使用,则会与YARN发生冲突。要运行您的流媒体应用程序,您需要先明确你的关卡目录,如果可以从一张白纸开始,然后:

  1. 必须某个地方工作jar文件的主节点上开始作业前,然后你的“--jar”标志必须指定“file:///path/on/master/node/to/jarfile.jar”。

当您指定“file:///”路径时,dataproc知道它已经在主节点上,因此它不会重新进入/ tmp目录,因此在这种情况下,检查点安全指向主服务器上的一些固定本地目录。

你可以做到这一点要么被初始化动作也可以提交一个快速的猪的工作(或只是ssh到主站和下载jar文件):

# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case) 
gcloud dataproc jobs submit pig --cluster my-test-cluster \ 
    --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar" 

# Submit the first attempt of the job 
gcloud dataproc jobs submit spark --cluster my-test-cluster \ 
    --class com.company.skyfall.Skyfall \ 
    --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \ 
    --properties spark.ui.showConsoleProgress=false 
  • Dataproc依靠引擎盖下的spark.yarn.tags来跟踪与作业相关的YARN应用程序。但是,检查点包含一个陈旧的spark.yarn.tags,它会导致Dataproc与似乎与旧作业关联的新应用程序混淆。
  • 现在,只要最近杀死的jobid保存在内存中,它只会“清除”可疑的YARN应用程序,因此重新启动dataproc代理将解决此问题。

    # Kill the job through the UI or something before the next step. 
    # Now use "pig sh" to restart the dataproc agent 
    gcloud dataproc jobs submit pig --cluster my-test-cluster \ 
        --execute "sh systemctl restart google-dataproc-agent.service" 
    
    # Re-run your job without needing to change anything else, 
    # it'll be fine now if you ever need to resubmit it and it 
    # needs to recover from the checkpoint again. 
    

    但请记住,通过检查站的性质,这意味着你将不能够改变你传给后续运行的参数,因为检查点恢复是用来揍你的命令行设置。

    +0

    谢谢丹尼斯。清理检查点工作。因此,总而言之,要从检查点恢复:1.将jar添加到本地文件系统,并在spark作业提交中引用它; 2.在从检查点恢复之前,重新启动dataproc代理,否则会抛出各种异常(https:/ /gist.github.com/rvenkatesh25/a36e6f339febbf0a6d3b96ce5ad08fdc)。这听起来是对的吗? –

    +0

    为了更清楚地说明,您只需要在第一次从给定检查点恢复时重新启动数据程序代理,所有后续时间都将无需重​​新启动。在Dataproc中处理这个更干净的工作正在进行中,所以当然需要重新启动代理并不理想。基本上,您第一次在没有检查点的情况下运行作业时,Dataproc的“jobid”会被烧入检查点文件中;所有后续的检查点恢复运行将重新使用该旧的jobid。因此,单次重新启动会使代理忘记第一次作业运行中的jobid。 –

    +0

    即使重新启动dataproc代理技巧,我也会得到以下异常:“17/05/16 17:39:04 ERROR org.apache.spark.SparkContext:初始化SparkContext时出错。 org.apache.spark.SparkException:Yarn应用程序有已经结束!它可能已经被杀死或无法启动应用程序主控器。 \t at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)“在dataproc端有什么变化吗? –

    0

    您也可以在纱线群集模式下运行作业,以避免将jar添加到主机。潜在的折衷是火花驱动程序将在工作节点而不是主节点中运行。