2016-10-03 58 views
2

我基本上有2个保存操作要在我的数据框上执行。这项工作运行良好。但是当我看到Spark UI中的事件时间轴时,我明白第一个动作1完成,然后动作2开始并完成。如何在触发器中触发2个写操作

由于这两个操作是相互独立的,有没有什么办法可以一起执行它们。以下是我的代码。

processedDF.write.format("ORC").options(Map("path" -> 
integrationFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." 
+ hiveTableName + "_int") 
errorDF.write.format("ORC").options(Map("path" -> 
errorFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." + 
hiveTableName + "_error") 

我想同时处理“processDF”和“errorDF”写入HDFS。

回答

1

你可以在不同的线程启动这些:

new Thread() { 
     override def run(): Unit = { 
     processedDF.write.format("ORC").options(Map("path" -> 
integrationFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." 
+ hiveTableName + "_int") 
     } 
    }.start() 



    new Thread() { 
      override def run(): Unit = { 
      errorDF.write.format("ORC").options(Map("path" -> 
errorFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." + 
hiveTableName + "_error") 
      } 
     }.start() 
0

请参阅调度文档here。再现相关部分逐字:

内的规定星火应用(例如SparkContext),如果他们从不同的线程提交多个并行作业可以同时运行[...]默认情况下,星火的调度运行在FIFO工作时尚。每个工作分为“阶段”(例如地图和缩小阶段),第一项工作优先考虑所有可用资源,其阶段有任务启动,然后第二项工作得到优先考虑,等等。 Spark 0.8,也可以配置作业之间的公平共享。在公平分享下,Spark以“循环”方式在作业之间分配任务,以便所有作业获得大致相等的群集资源份额。这意味着在长时间工作时提交的短工可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长时间的工作。

val conf = new SparkConf().setMaster(...).setAppName(...) 
conf.set("spark.scheduler.mode", "FAIR") 
val sc = new SparkContext(conf) 

要启用公平调度器,只需配置SparkContext当spark.scheduler.mode属性设置为FAIR