2016-02-26 116 views
1

我希望持续保留Spark任务中的RDD,以便所有使用Spark Job Server的后续作业都可以使用它。这是我曾尝试:在Spark Job Server中持久/共享RDD

工作1:

package spark.jobserver 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 

object FirstJob extends SparkJob with NamedRddSupport { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob") 
    val sc = new SparkContext(conf) 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + results) 
    } 

    override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid 

    override def runJob(sc: SparkContext, config: Config): Any = { 

    // the below variable is to be accessed by other jobs: 
    val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text")) 

    this.namedRdds.update("resultsRDD", to_be_persisted) 
    return to_be_persisted 
    } 
} 

工作2:

package spark.jobserver 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 


object NextJob extends SparkJob with NamedRddSupport { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob") 
    val sc = new SparkContext(conf) 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + results) 
    } 

    override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid 

    override def runJob(sc: SparkContext, config: Config): Any = { 

    val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get 
    rdd 
    } 
} 

我得到的错误是:

{ 
    "status": "ERROR", 
    "result": { 
    "message": "None.get", 
    "errorClass": "java.util.NoSuchElementException", 
    "stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"] 
    } 

请修改上面的代码,以便to_be_persisted是可访问的。 感谢

编辑:

产生火花背景下,编制和使用的包装阶源后:

curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m' 

调用FirstJob和NextJob使用:

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true' 

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true' 
+0

如果您不得不从NamedRDD中受益,则必须在同一上下文中运行所有作业。你在做那个吗? – noorul

+0

是的,我为这两个工作使用了相同的环境。 – vdep

回答

5

似乎有两个问题在这里:

  1. 如果您正在使用最新的火花jobserver版本(0.6.2 -SNAPSHOT),还有约NamedObjects工作不正常开放的bug - 似乎适合你的描述:https://github.com/spark-jobserver/spark-jobserver/issues/386

  2. 你也有很小的类型不匹配 - 在FirstJob你坚持一个RDD[String],并在NextJob你想获取一个RDD[(String, String)] - 在NextJob,应阅读val rdd = this.namedRdds.get[String]("resultsRDD").get)。

我已经试过你的代码火花jobserver版本0.6.0,并与上面说小型修复,和它的作品。

+0

谢谢Tzach,它的工作原理。 (它也适用于spark 1.5.2和sjs 0.6.1) – vdep