2013-05-31 53 views
0

我想将此配置textinputformat.record.delimiter=;设置为hadoop。如何使用java代码设置amazon ami的hadoop配置

现在我使用下面的代码在ami上运行猪脚本。任何人都知道如何通过使用以下代码来设置此配置?

代码:

StepConfig installPig = new StepConfig() 
.withName("Install Pig") 
.withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW.name()) 
.withHadoopJarStep(stepFactory.newInstallPigStep()); 

// [Configure pig script][1] 

String[] scriptArgs = new String[] { "-p", input, "-p", output }; 
StepConfig runPigLatinScript = new StepConfig() 
.withName("Run Pig Script")    .withActionOnFailure(ActionOnFailure.CANCEL_AND_WAIT.name()) 
.withHadoopJarStep(stepFactory.newRunPigScriptStep("s3://pig/script.pig", scriptArgs)); 

// Configure JobFlow [R1][2], [R3][3] 
// 
// 

RunJobFlowRequest request = new RunJobFlowRequest() 
.withName(jobFlowName) 
.withSteps(installPig, runPigLatinScript) 
.withLogUri(logUri) 
.withAmiVersion("2.3.2") 
.withInstances(new JobFlowInstancesConfig() 
      .withEc2KeyName(this.ec2KeyName) 
      .withInstanceCount(this.count) 
      .withKeepJobFlowAliveWhenNoSteps(false) 
      .withMasterInstanceType(this.masterType) 
      .withSlaveInstanceType(this.slaveType)); 
// Run JobFlow 
RunJobFlowResult runJobFlowResult = this.amazonEmrClient.runJobFlow(request); 

回答

2

你需要做的是创造BootstrapActionConfig并将其添加到RunJobFlowRequest被创建,那么这将添加自定义Hadoop配置到集群。

下面是完整的代码编辑代码here后,我对你说:

import java.util.ArrayList; 
import java.util.List; 

import com.amazonaws.auth.AWSCredentials; 
import com.amazonaws.auth.BasicAWSCredentials; 
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; 
import com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig; 
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig; 
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest; 
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; 
import com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig; 
import com.amazonaws.services.elasticmapreduce.model.StepConfig; 
import com.amazonaws.services.elasticmapreduce.util.StepFactory; 

/** 
* 
* @author amar 
* 
*/ 
public class RunEMRJobFlow { 

    private static final String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"; 

    public static void main(String[] args) { 

     String accessKey = ""; 
     String secretKey = ""; 
     AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); 
     AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials); 

     StepFactory stepFactory = new StepFactory(); 

     StepConfig enabledebugging = new StepConfig().withName("Enable debugging") 
       .withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep()); 

     StepConfig installHive = new StepConfig().withName("Install Hive").withActionOnFailure("TERMINATE_JOB_FLOW") 
       .withHadoopJarStep(stepFactory.newInstallHiveStep()); 
     List<String> setMappersArgs = new ArrayList<String>(); 
     setMappersArgs.add("-s"); 
     setMappersArgs.add("textinputformat.record.delimiter=;"); 

     BootstrapActionConfig mappersBootstrapConfig = createBootstrapAction("Set Hadoop Config", 
       CONFIG_HADOOP_BOOTSTRAP_ACTION, setMappersArgs); 

     RunJobFlowRequest request = new RunJobFlowRequest() 
       .withBootstrapActions(mappersBootstrapConfig) 
       .withName("Hive Interactive") 
       .withSteps(enabledebugging, installHive) 
       .withLogUri("s3://myawsbucket/") 
       .withInstances(
         new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20") 
           .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true) 
           .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small")); 

     RunJobFlowResult result = emr.runJobFlow(request); 
    } 

    private static BootstrapActionConfig createBootstrapAction(String bootstrapName, String bootstrapPath, 
      List<String> args) { 

     ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig(); 
     bootstrapScriptConfig.setPath(bootstrapPath); 

     if (args != null) { 
      bootstrapScriptConfig.setArgs(args); 
     } 

     BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig(); 
     bootstrapConfig.setName(bootstrapName); 
     bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig); 

     return bootstrapConfig; 
    } 

} 
相关问题