2017-09-28 48 views
1

我正在尝试创建一个EMR集群并执行一个Spark作业。如何获取AWS EMR群集中某个步骤的状态?

我需要启动一个线程,以便当我的作业完成时让我知道该步骤已经完成,因为我的EMR集群将启动并运行。

{ 
    AmazonElasticMapReduceClient emr = configureEMRClient(); 

    StepFactory stepFactory = new StepFactory(); 

    StepConfig enableDebugging = new StepConfig() 
     .withName("Enable Debugging") 
     .withActionOnFailure("TERMINATE_JOB_FLOW") 
     .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); 

    HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig() 
     .withJar("command-runner.jar") 
     .withArgs("spark-submit") 
     .withArgs("--deploy-mode","cluster") 
     .withArgs("--master","yarn") 
     .withArgs("--class", extractorMainClass) 
     .withArgs("--num-executors","3") 
     .withArgs("--driver-memory","8g") 
     .withArgs(resourceExtractorJar) 
     .withArgs("st") 
     .withArgs("ap"); 

    StepConfig customExampleStep = new StepConfig() 
      .withName("Example Step") 
      .withActionOnFailure("TERMINATE_JOB_FLOW") 
      .withHadoopJarStep(runExampleConfig); 

    Application spark = new Application().withName("Spark"); 

    RunJobFlowRequest request = new RunJobFlowRequest() 
      .withName("Test") 
      .withReleaseLabel("emr-5.5.0") 
      .withSteps(enableDebugging, customExampleStep) 
      .withApplications(spark) 
      .withServiceRole("EMR_DefaultRole") 
      .withJobFlowRole("EMR_EC2_DefaultRole") 
      .withInstances(new JobFlowInstancesConfig() 
        .withEc2KeyName("keypair") 
        .withInstanceCount(2) 
        .withKeepJobFlowAliveWhenNoSteps(false) 
        .withMasterInstanceType("m3.xlarge") 
        .withSlaveInstanceType("m3.xlarge"));  

    RunJobFlowResult result = emr.runJobFlow(request); 

} 

要获取步骤状态:

while(true) { 
       DescribeClusterRequest desc = new DescribeClusterRequest() 
       .withClusterId(jobFlowId); 
       DescribeClusterResult clusterResult = emrClient.describeCluster(desc); 
       com.amazonaws.services.elasticmapreduce.model.Cluster cluster = clusterResult.getCluster(); 

       DescribeStepRequest d = new DescribeStepRequest().withClusterId(jobFlowId); 
       DescribeStepResult r = emrClient.describeStep(d); 

       Step ss = r.getStep(); 

       String status = ss.getStatus().getState(); 

       System.out.printf("Status: %s\n", status); 
       if(status.equals(ClusterState.TERMINATED.toString()) || status.equals(ClusterState.TERMINATED_WITH_ERRORS.toString())) { 
       System.out.println("Terminated"); 
        break; 
       } 
       try { 
       TimeUnit.SECONDS.sleep(30); 
       } catch (InterruptedException e) { 
       e.printStackTrace(); 
       } 
       // maybe other handle 
      } 

得到以下错误:

Exception in thread "main" com.amazonaws.services.elasticmapreduce.model.InvalidRequestException: Step id 'null' is not valid. (Service: AmazonElasticMapReduce; Status Code: 400; Error Code: InvalidRequestException; Request ID: 98e1b43e-a440-11e7-920b-158124595c35) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) 
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) 

我怎样才能得到一个EMR集群部署内部的步骤的状态?

回答

1

回答这个问题,因为我想通了:

ListStepsResult steps = emrClient.listSteps(new ListStepsRequest().withClusterId(jobFlowId)); 
    StepSummary step = steps.getSteps().get(0); 
    System.out.println(step.getStatus().getState()); 
相关问题