我有一种情况,我想在Spark中的每个工人上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个过程启动一个守护进程,这个守护进程在执行其余的程序之前需要运行。理想情况下,这应该在我读取任何数据之前执行。是否可以对Apache Spark中的所有worker执行命令?
我在Spark 2.0.2上并使用动态分配。
我有一种情况,我想在Spark中的每个工人上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个过程启动一个守护进程,这个守护进程在执行其余的程序之前需要运行。理想情况下,这应该在我读取任何数据之前执行。是否可以对Apache Spark中的所有worker执行命令?
我在Spark 2.0.2上并使用动态分配。
您可以使用lazy val和Spark广播的组合来实现此目的。它将如下所示。 (下面的代码还没有编译,您可能需要改变一些东西)
object ProcessManager {
lazy val start = // start your process here.
}
你可以做你任何转换之前,在你的应用程序开始播出该对象。
val pm = sc.broadcast(ProcessManager)
现在,您可以像转换任何其他广播变量一样访问此对象,并调用lazy val。
rdd.mapPartition(itr => {
pm.value.start
// Other stuff here.
}
带有静态初始化的object
,它调用你的系统进程应该有所斩断。
object SparkStandIn extends App {
object invokeSystemProcess {
import sys.process._
val errorCode = "echo Whatever you put in this object should be executed once per jvm".!
def doIt(): Unit = {
// this object will construct once per jvm, but objects are lazy in
// another way to make sure instantiation happens is to check that the errorCode does not represent an error
}
}
invokeSystemProcess.doIt()
invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
但你如何确保在每次转换时不重复调用而实际初始化? – 2016-11-29 20:40:33
重复的:http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –