2014-04-07 54 views
0

我有我的hadoop程序,如下所示。我提供了相关代码的片段。我把通过读取BiG_DATA的参数传递给main。主要内容是“处理大数据”。但是对于RowPreMap类中的map方法,BIG_DATA的值是其初始值为false。不知道为什么会这样。我错过了什么吗?这在我独立运行的机器上运行时会起作用,但是当我在hadoop群集上执行此操作时不起作用。作业由JobControl处理。线程是什么东西?在Hadoop集群中使用另一个类的静态变量

公共类UVDriver扩展配置的实现工具{

public static class RowMPreMap extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 

     private Text keyText = new Text(); 
     private Text valText = new Text(); 

     public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) 
       throws IOException { 

      // Input: (lineNo, lineContent) 

      // Split each line using seperator based on the dataset. 
      String line[] = null; 
      if (Settings.BIG_DATA) 
       line = value.toString().split("::"); 
      else 
       line = value.toString().split("\\s"); 

      keyText.set(line[0]); 
      valText.set(line[1] + "," + line[2]); 

      // Output: (userid, "movieid,rating") 
      output.collect(keyText, valText); 

     } 
    } 

    public static class Settings { 

     public static boolean BIG_DATA = false; 

     public static int noOfUsers = 0; 
     public static int noOfMovies = 0; 

     public static final int noOfCommonFeatures = 10; 
     public static final int noOfIterationsRequired = 3; 
     public static final float INITIAL_VALUE = 0.1f; 

     public static final String NORMALIZE_DATA_PATH_TEMP = "normalize_temp"; 
     public static final String NORMALIZE_DATA_PATH = "normalize"; 
     public static String INPUT_PATH = "input"; 
     public static String OUTPUT_PATH = "output"; 
     public static String TEMP_PATH = "temp"; 

    } 

    public static class Constants { 

     public static final int BIG_DATA_USERS = 71567; 
     public static final int BIG_DATA_MOVIES = 10681; 
     public static final int SMALL_DATA_USERS = 943; 
     public static final int SMALL_DATA_MOVIES = 1682; 

     public static final int M_Matrix = 1; 
     public static final int U_Matrix = 2; 
     public static final int V_Matrix = 3; 
    } 

    public int run(String[] args) throws Exception { 

     // 1. Pre-process the data. 
     // a) Normalize 
     // 2. Initialize the U, V Matrices 
     // a) Initialize U Matrix 
     // b) Initialize V Matrix 
     // 3. Iterate to update U and V. 

     // Write Job details for each of the above steps. 

     Settings.INPUT_PATH = args[0]; 
     Settings.OUTPUT_PATH = args[1]; 
     Settings.TEMP_PATH = args[2]; 
     Settings.BIG_DATA = Boolean.parseBoolean(args[3]); 

     if (Settings.BIG_DATA) { 
      System.out.println("Working on BIG DATA."); 
      Settings.noOfUsers = Constants.BIG_DATA_USERS; 
      Settings.noOfMovies = Constants.BIG_DATA_MOVIES; 
     } else { 
      System.out.println("Working on Small DATA."); 
      Settings.noOfUsers = Constants.SMALL_DATA_USERS; 
      Settings.noOfMovies = Constants.SMALL_DATA_MOVIES; 
     } 

      // some code here 

      handleRun(control); 


     return 0; 
    } 

    public static void main(String args[]) throws Exception { 

     System.out.println("Program started"); 
     if (args.length != 4) { 
      System.err 
        .println("Usage: UVDriver <input path> <output path> <fs path>"); 
      System.exit(-1); 
     } 

     Configuration configuration = new Configuration(); 
     String[] otherArgs = new GenericOptionsParser(configuration, args) 
       .getRemainingArgs(); 
     ToolRunner.run(new UVDriver(), otherArgs); 
     System.out.println("Program complete."); 
     System.exit(0); 
    } 

} 

作业控制。

public static class JobRunner implements Runnable { 
     private JobControl control; 

     public JobRunner(JobControl _control) { 
      this.control = _control; 
     } 

     public void run() { 
      this.control.run(); 
     } 
    } 

    public static void handleRun(JobControl control) 
      throws InterruptedException { 
     JobRunner runner = new JobRunner(control); 
     Thread t = new Thread(runner); 
     t.start(); 

     int i = 0; 
     while (!control.allFinished()) { 
      if (i % 20 == 0) { 
       System.out 
         .println(new Date().toString() + ": Still running..."); 
       System.out.println("Running jobs: " 
         + control.getRunningJobs().toString()); 
       System.out.println("Waiting jobs: " 
         + control.getWaitingJobs().toString()); 
       System.out.println("Successful jobs: " 
         + control.getSuccessfulJobs().toString()); 
      } 
      Thread.sleep(1000); 
      i++; 
     } 

     if (control.getFailedJobs() != null) { 
      System.out.println("Failed jobs: " 
        + control.getFailedJobs().toString()); 
     } 
    } 
+0

看起来,在使用Hadoop分布式模式时,使用不同类的静态变量并不是一个好习惯。只有通过将它们设置为配置对象才能访问它们。 – TechCrunch

+0

是的。把你的变量放在JobConf中,并在Mapper中重写'configure'来访问它们。 http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/mapred/Mapper.html – Scott

+0

我已经完全按照这种方式使用它,它现在可以工作。我试图理解为什么后面提到的方式不起作用。 – TechCrunch

回答

1

这不会起作用,因为static修饰符的范围不跨JVM(少得多的网络。)

map任务始终运行在一个单独的JVM的多个实例跨越,即使它在本地运行到工具运行器。映射器类仅使用类名实例化,并且无法访问您在工具运行器中设置的信息。

这是存在配置框架的原因之一。

相关问题