2017-06-07 227 views
0

我正在致力于应用程序从java应用程序执行spark批量应用程序。从Java应用程序启动并监控Spark应用程序

有一个主线程启动线程来启动spark应用程序。它使用zookeeper在将启动spark应用程序的机器中找到leaderMain方法是这样的:

public static void main(String[] args) throws IOException { 

     final int id = Integer.valueOf(args[0]); 
     final String zkURL = args[1]; 

     final ExecutorService service = Executors.newSingleThreadExecutor(); 

     final Future<?> status = service.submit(new ProcessNode(id, zkURL)); 
     try { 
      status.get(); 
     } catch (InterruptedException | ExecutionException e) { 
      LOG.fatal(e.getMessage(), e); 
      service.shutdown(); 
     } 

一旦leader选择,下面的代码将在其上运行启动spark应用。

protected Boolean executeCommand() { 
    try { 
     final Runtime rt = Runtime.getRuntime(); 
     final Process proc = rt.exec("sh start-sparkapp.sh"); 
     final int exitVal = proc.waitFor(); 
     BufferedReader buf = new BufferedReader(new InputStreamReader(proc.getInputStream())); 
     String line = ""; 
     while ((line=buf.readLine())!=null) { 
     System.out.println(line); 
     } 

     System.out.println(" commandToExecute exited with code: " + exitVal); 
     proc.destroy(); 

    } catch (final Exception e) { 
     System.out.println("Exception occurred while Launching process : " + e.getMessage()); 
     return Boolean.FALSE; 
    } 
     return Boolean.TRUE; 
} 

但是这开始长时间运行spark工作。所以我相信,只有当spark工作完成时,代码的下一部分才会被执行。我的要求是,只要spark应用程序启动,控制权转到代码的下一部分,我在监视相同的spark应用程序的状态。即我启动了spark应用程序,并从相同的java应用程序监视spark应用程序的状态。 假设我有一个方法montior用于监视应用

public String monitor(ApplicationId id) 

任何建议的状态如何实现这一目标?

+0

标记“apache-spark”是否合理? – suj1th

回答

0

由于您将使用方法public String monitor(ApplicationId id)来监视Spark应用程序,因此我假定您不希望当前线程在使用proc.waitFor()的进程上等待。另外,您不希望将流程的正常输出打印到控制台。这两个操作都会使您的线程在生成的进程中等待。而且,你的监视器方法应该把生成的进程的进程id,而不是spark applicationId作为输入。 因此,修改后的代码可能如下所示:

protected Boolean executeCommand() { 
try { 
    final Runtime rt = Runtime.getRuntime(); 
    final Process proc = rt.exec("sh start-sparkapp.sh"); 

    /* 
    *Call to method monitor(ProcessId id) 
    */ 

    } catch (final Exception e) { 
     System.out.println("Exception occurred while Launching process : " + e.getMessage()); 
     return Boolean.FALSE; 
    } 
    return Boolean.TRUE; 
}