2017-09-24 29 views
2

我有这个主...使用自定义组合器...它可能被忽略?

job.setMapperClass(AverageIntMapper.class); 
    job.setCombinerClass(AverageIntCombiner.class); 
    job.setReducerClass(AverageIntReducer.class); 

与组合有不同的代码,但该组合被完全忽略的减速器使用输出从映射器输出。

我明白一个Combiner可能不会被使用,但我认为这是Combiner和Reducer一样的情况。我真的不明白能够创建自定义组合器的意义,但系统仍然可以跳过它的使用。

如果不应该发生这种情况,可能是组合器没有被使用的原因是什么?

代码...

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class AverageInt { 

public static class AverageIntMapper extends Mapper<LongWritable, Text, Text, Text> { 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

     String n_string = value.toString(); 
     context.write(new Text("Value"), new Text(n_string)); 
    } 
} 

public static class AverageIntCombiner extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 

     int sum = 0; 
     int count = 0; 

     for(IntWritable value : values) { 
      int temp = Integer.parseInt(value.toString()); 
      sum += value.get(); 
      count += 1; 
     } 

     String sum_count = Integer.toString(sum) + "," + Integer.toString(count); 

     context.write(key, new Text(sum_count)); 
    } 
} 

public static class AverageIntReducer extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 

     int total = 0; 
     int count = 0; 

     for(Text value : values) { 
      String temp = value.toString(); 
      String[] split = temp.split(","); 
      total += Integer.parseInt(split[0]); 
      count += Integer.parseInt(split[1]); 
     } 

     Double average = (double)total/count; 

     context.write(key, new Text(average.toString())); 
    } 
} 

public static void main(String[] args) throws Exception { 

    if(args.length != 2) { 
     System.err.println("Usage: AverageInt <input path> <output path>"); 
     System.exit(-1); 
    } 

    Job job = new Job(); 
    job.setJarByClass(AverageInt.class); 
    job.setJobName("Average"); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(AverageIntMapper.class); 
    job.setCombinerClass(AverageIntCombiner.class); 
    job.setReducerClass(AverageIntReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

你怎么知道它被忽略?组合器没有数量吗? –

+0

@BinaryNerd在Combiner中创建的值有一个分隔符,我尝试在Reducer中分割,但当我尝试分割时,出现数组越界的错误。如果我删除了在Reducer中分割的逻辑并输出它作为输入获取的值,那么Mapper会输出这些值。 – cpd1

+0

我会发布你的代码,否则它不可能有人能帮助你。 –

回答

1

如果你看一下你的映射器发射:

public void map(LongWritable key, Text value, Context context)

它发送两个Text对象,但同时你声明组合类本身正确地说,减少方法有:

public void reduce(Text key, Iterable<IntWritable> values, Context context)

它应该是:

public void reduce(Text key, Iterable<Text> values, Context context)

+0

这看起来像解决了问题。我想我没有注意到这个问题,因为编译/执行时没有任何错误。 – cpd1

+0

这是一个容易犯的错误,hadoop将一直使用Reduce类中的基本实现,它只是将数据传递过去而不修改它。 –