2016-11-29 52 views
3

我有一种情况,我想在Spark中的每个工人上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个过程启动一个守护进程,这个守护进程在执行其余的程序之前需要运行。理想情况下,这应该在我读取任何数据之前执行。是否可以对Apache Spark中的所有worker执行命令?

我在Spark 2.0.2上并使用动态分配。

+0

重复的:http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –

回答

5

您可以使用lazy val和Spark广播的组合来实现此目的。它将如下所示。 (下面的代码还没有编译,您可能需要改变一些东西)

object ProcessManager { 
    lazy val start = // start your process here. 
} 

你可以做你任何转换之前,在你的应用程序开始播出该对象。

val pm = sc.broadcast(ProcessManager) 

现在,您可以像转换任何其他广播变量一样访问此对象,并调用lazy val。

rdd.mapPartition(itr => { 
    pm.value.start 
    // Other stuff here. 
} 
+0

这不会为每个分区执行一次该过程,而且每个工作者不会执行一次? – Jon

+0

你是对的,那只是一个例子。但由于它是一个惰性val,ProcessManager是一个“对象”,它只能在执行器中运行一次。 – Jegan

+0

广播该对象有点奇怪。你应该广播数据,而不是代码。只需拥有对象并访问启动变量就足够了。这样您就不需要ProcessManager对象可序列化。 – Atreys

2

带有静态初始化的object,它调用你的系统进程应该有所斩断。

object SparkStandIn extends App { 
    object invokeSystemProcess { 
    import sys.process._ 
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".! 

    def doIt(): Unit = { 
     // this object will construct once per jvm, but objects are lazy in 
     // another way to make sure instantiation happens is to check that the errorCode does not represent an error 
    } 
    } 
    invokeSystemProcess.doIt() 
    invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once 
} 
+0

但你如何确保在每次转换时不重复调用而实际初始化? – 2016-11-29 20:40:33

相关问题