2013-02-24 103 views
1

我需要使用所有reduce任务的结果执行聚合。基本上减少任务找到总数和计数和一个值。我需要添加所有的总和和数量,并找到最终的平均值。在master和reduce之间共享数据

我试着用conf.setInt中的reduce。但是,当我试图从main函数访问失败

class Main { 

public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> { 

    public void reduce(Text key, Iterable<Text> values, 
      Context context 
      ) throws IOException, InterruptedException { 
     int i = 0; 
     int fd = 0, fc = 0; 
     fd = context.getConfiguration().getInt("fd", -1); 
     fc = context.getConfiguration().getInt("fc", -1); 
     //when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it. 
    } 
} 

public static void main(String[] args) throws Exception{ 
    Configuration conf = new Configuration(); 
    conf.setInt("fc", 5); 

    Job job = new Job(conf, "Flight Data"); 
    job.setJarByClass(FlightData.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(MyReducer.class); 

    job.setPartitionerClass(FirstPartitioner.class); 
    job.setGroupingComparatorClass(GroupComparator.class); 
    job.setSortComparatorClass(KeyComparator.class); 


    job.setNumReduceTasks(10); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 


    flightCount = job.getConfiguration().getInt("fc", -1); 
    flightDelay = job.getConfiguration().getInt("fd", -1); 
    //here when I access fc, fd, I get back 5 & 5 
    System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount); 
} 
+0

你得到什么错误?你也可以添加你使用的语言作为标签吗? – 2013-02-24 02:43:00

回答

0

覆盖使用新的API org.apache.hadoop.mapreduce映射器和减速的run()。在这些方法中,您可以从每个映射器或减速器发出累计总和/计数。

此外,您需要限制减数计数1以获得由多个映射器生成的所有总和的全局总和。

请参见下面的代码更加清晰:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class AggregationExample extends Configured implements Tool { 

    /** 
    * This is Mapper. 
    * 
    */ 
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> { 

     private Text outputKey = new Text(); 
     private Text outputValue = new Text(); 
     private double sum; 

     @Override 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

      try { 
       // say that you need to sum up the value part 
       sum+= Double.valueOf(value); 
     } 

     @Override 
     public void run(Context context) throws IOException, InterruptedException { 

      setup(context); 
      while (context.nextKeyValue()) { 
       map(context.getCurrentKey(), context.getCurrentValue(), context); 
      } 

      // emit out the sum per mapper 
      outputKey.set(sum); 
      context.write(outputKey, outputValue);// Notice that the outputValue is empty 
      cleanup(context); 

     } 
    } 

    /** 
    * This is Reducer. 
    * 
    */ 
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> { 

     private Text outputKey = new Text(); 
     private Text outputValue = new Text(); 
     private double sum; 

     @Override 
     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
       InterruptedException { 


      // summation of values from each mapper 
      sum += Double.valueOf(key.toString()); 

     } 

     @Override 
     public void run(Context context) throws IOException, InterruptedException { 

      setup(context); 
      while (context.nextKey()) { 
       reduce(context.getCurrentKey(), context.getValues(), context); 
      } 

      // emit out the global sums 
      outputKey.set(sum); 
      context.write(outputKey, outputValue); 
      cleanup(context); 
     } 
    } 

    @Override 
    public int run(String[] args) throws Exception { 

     try { 
      Configuration conf = getConf(); 

      // output key and value separator is empty as in final output only 
      // key is emitted and value is empty 
      conf.set("mapred.textoutputformat.separator", ""); 

      // Configuring mapred to have just one reducer as we need to find 
      // single sum values from all the inputs 
      conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); 
      conf.setInt("mapred.reduce.tasks", 1); 

      Job job = new Job(conf); 

      job.setJarByClass(AggregationExample.class); 
      job.setJobName("Aggregation Example"); 

      job.setMapperClass(MapJob.class); 
      job.setReducerClass(ReduceJob.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      job.setInputFormatClass(TextInputFormat.class); 
      job.setOutputFormatClass(TextOutputFormat.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(Text.class); 
      FileInputFormat.setInputPaths(job, args[0]); 
      FileOutputFormat.setOutputPath(job, new Path(args[1])); 

      boolean success = job.waitForCompletion(true); 
      return success ? 0 : 1; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 

     if (args.length < 2) { 
      System.out 
        .println("Usage: AggregationExample <comma sparated list of input directories> <output dir>"); 
      System.exit(-1); 
     } 

     int result = ToolRunner.run(new AggregationExample(), args); 
     System.exit(result); 
    } 

} 

你很可能映射这种方法,您的问题。

0

找到解决方案。我使用的计数器

http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/

公共类FlightData {

//enum for counters used by reducers 
public static enum FlightCounters { 
    FLIGHT_COUNT, 
    FLIGHT_DELAY; 
} 
public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> { 

    public void reduce(Text key, Iterable<Text> values, 
      Context context 
      ) throws IOException, InterruptedException { 


     delay1 = Float.parseFloat(origin[5]); 
     delay2 = Float.parseFloat(dest[5]); 
     context.getCounter(FlightCounters.FLIGHT_COUNT).increment(1); 
     context.getCounter(FlightCounters.FLIGHT_DELAY) 
     .increment((long) (delay1 + delay2)); 

    } 
} 
public static void main(String[] args) throws Exception{ 
    float flightCount, flightDelay; 
    job.waitForCompletion(true); 
    //get the final results updated in counter by all map and reduce tasks 
    flightCount = job.getCounters() 
      .findCounter(FlightCounters.FLIGHT_COUNT).getValue(); 
    flightDelay = job.getCounters() 
      .findCounter(FlightCounters.FLIGHT_DELAY).getValue(); 
} 

}