2017-04-12 61 views
2

我已经使用sbt assembly构建了我的作业jar,将所有依赖关系放在一个jar中。当我尝试提交我的二进制火花jobserver我越来越akka.pattern.AskTimeoutException将提交fat-jar到spark-jobserver时得到超时(akka.pattern.AskTimeoutException)

我修改我的配置才能够提交大罐子(我加parsing.max-content-length = 300m到我的配置)我也增加了一些超时的配置,但没有任何帮助。

后,我跑:

curl --data-binary @matching-ml-assembly-1.0.jar localhost:8090/jars/matching-ml 

我越来越:

{ 
    "status": "ERROR", 
    "result": { 
    "message": "Ask timed out on [Actor[akka://JobServer/user/binary-manager#1785133213]] after [3000 ms]. Sender[null] sent message of type \"spark.jobserver.StoreBinary\".", 
    "errorClass": "akka.pattern.AskTimeoutException", 
    "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)", "akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)", "scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)", "scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:331)", "akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:282)", "akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:286)", "akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)", "java.lang.Thread.run(Thread.java:745)"] 
    } 

我的配置:

# Template for a Spark Job Server configuration file 
# When deployed these settings are loaded when job server starts 
# 
# Spark Cluster/Job Server configuration 
spark { 
    # spark.master will be passed to each job's JobContext 
    master = "local[4]" 
    # master = "mesos://vm28-hulk-pub:5050" 
    # master = "yarn-client" 

    # Default # of CPUs for jobs to use for Spark standalone cluster 
    job-number-cpus = 4 

    jobserver { 
    port = 8090 

    context-per-jvm = false 
    # Note: JobFileDAO is deprecated from v0.7.0 because of issues in 
    # production and will be removed in future, now defaults to H2 file. 
    jobdao = spark.jobserver.io.JobSqlDAO 

    filedao { 
     rootdir = /tmp/spark-jobserver/filedao/data 
    } 

    datadao { 
     # storage directory for files that are uploaded to the server 
     # via POST/data commands 
     rootdir = /tmp/spark-jobserver/upload 
    } 

    sqldao { 
     # Slick database driver, full classpath 
     slick-driver = slick.driver.H2Driver 

     # JDBC driver, full classpath 
     jdbc-driver = org.h2.Driver 

     # Directory where default H2 driver stores its data. Only needed for H2. 
     rootdir = /tmp/spark-jobserver/sqldao/data 

     # Full JDBC URL/init string, along with username and password. Sorry, needs to match above. 
     # Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass 
     jdbc { 
     url = "jdbc:h2:file:/tmp/spark-jobserver/sqldao/data/h2-db" 
     user = "" 
     password = "" 
     } 

     # DB connection pool settings 
     dbcp { 
     enabled = false 
     maxactive = 20 
     maxidle = 10 
     initialsize = 10 
     } 
    } 
    # When using chunked transfer encoding with scala Stream job results, this is the size of each chunk 
    result-chunk-size = 1m 
    } 

    # Predefined Spark contexts 
    # contexts { 
    # my-low-latency-context { 
    #  num-cpu-cores = 1   # Number of cores to allocate. Required. 
    #  memory-per-node = 512m   # Executor memory per node, -Xmx style eg 512m, 1G, etc. 
    # } 
    # # define additional contexts here 
    # } 

    # Universal context configuration. These settings can be overridden, see README.md 
    context-settings { 
    num-cpu-cores = 2   # Number of cores to allocate. Required. 
    memory-per-node = 2G   # Executor memory per node, -Xmx style eg 512m, #1G, etc. 

    # In case spark distribution should be accessed from HDFS (as opposed to being installed on every Mesos slave) 
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz" 

    # URIs of Jars to be loaded into the classpath for this context. 
    # Uris is a string list, or a string separated by commas ',' 
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"] 

    # Add settings you wish to pass directly to the sparkConf as-is such as Hadoop connection 
    # settings that don't use the "spark." prefix 
    passthrough { 
     #es.nodes = "192.1.1.1" 
    } 
    } 

    # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully 
    # home = "/home/spark/spark" 
} 

# Note that you can use this file to define settings not only for job server, 
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults. 
spray.can.server { 
    # uncomment the next lines for making this an HTTPS example 
    # ssl-encryption = on 
    # path to keystore 
    #keystore = "/some/path/sjs.jks" 
    #keystorePW = "changeit" 

    # see http://docs.oracle.com/javase/7/docs/technotes/guides/security/StandardNames.html#SSLContext for more examples 
    # typical are either SSL or TLS 
    encryptionType = "SSL" 
    keystoreType = "JKS" 
    # key manager factory provider 
    provider = "SunX509" 
    # ssl engine provider protocols 
    enabledProtocols = ["SSLv3", "TLSv1"] 
    idle-timeout = 60 s 
    request-timeout = 20 s 
    connecting-timeout = 5s 
    pipelining-limit = 2 # for maximum performance (prevents StopReading/ResumeReading messages to the IOBridge) 
    # Needed for HTTP/1.0 requests with missing Host headers 
    default-host-header = "spray.io:8765" 

    # Increase this in order to upload bigger job jars 
    parsing.max-content-length = 300m 
} 


akka { 
    remote.netty.tcp { 
    # This controls the maximum message size, including job results, that can be sent 
    # maximum-frame-size = 10 MiB 
    } 
} 

回答

1

我来到了类似的问题。如何解决这个问题有点棘手。首先,您需要将spark.jobserver.short-timeout添加到您的配置中。只需修改您的配置,如下所示:

jobserver { 
    port = 8090 

    context-per-jvm = false 
    short-timeout = 60s 
    ... 
} 

第二个(棘手)部分是您无法修改它,而无需修改spark-job-application的代码。这导致超时属性是BinaryManager类:

implicit val daoAskTimeout = Timeout(3 seconds) 

默认设置为3其中第二个显然是大罐子是不够的。你可以把它增加到例如60秒,这对我来说是解决问题的方法。

implicit val daoAskTimeout = Timeout(60 seconds) 
+0

我试试看,它的工作原理谢谢 –

1

其实你可以很容易地降低罐子的大小。还有一些相关的罐子可以使用依赖jar-uris来传递,而不是组装成一个大的罐子。