2015-05-09 34 views
0

我有一个简单的MapReduce作业,它应该从文本文件中读取字典,并且它们逐行处理另一个大文件并计算逆文档矩阵。输出应该是这样的:MapReduce - reducer发出一行输出

word-id1 docX:tfX docY:tfY 
word-id2 docX:tfX docY:tfY etc... 

但是,减速器的输出仅在一个huuuge线发射。我不明白为什么它应该为每个word-id(这是reducer的关键)发出一个新行。

映射器会生成正确的输出(对word-iddoc-id:tf的值在单独的行上)。我测试了没有减速器。减速器应该为每个键在一行上追加对应于相同键的值。

请你看看我的代码(特别是减速器和工作配置),并告诉我为什么减速器只发出一条巨大的线而不是与指定键对应的多条线?我花了好几个小时来调试,无法绕过它。

public class Indexer extends Configured implements Tool { 

    /* 
    * Vocabulary: key = term, value = index 
    */ 
    private static Map<String, Integer> vocab = new HashMap<String, Integer>(); 

    public static void main(String[] arguments) throws Exception { 
     System.exit(ToolRunner.run(new Indexer(), arguments)); 
    } 

    public static class Comparator extends WritableComparator { 
     protected Comparator() { 
      super(Text.class, true); 
     } 

     @Override 
     public int compare(WritableComparable a, WritableComparable b) { 
      // Here we use exploit the implementation of compareTo(...) in 
      // Text.class. 
      return -a.compareTo(b); 
     } 
    } 

    public static class IndexerMapper extends 
      Mapper<Object, Text, IntWritable, Text> { 
     private Text result = new Text(); 

     // load vocab from distributed cache 
     public void setup(Context context) throws IOException { 
      Configuration conf = context.getConfiguration(); 
      FileSystem fs = FileSystem.get(conf); 
      URI[] cacheFiles = DistributedCache.getCacheFiles(conf); 
      Path getPath = new Path(cacheFiles[0].getPath()); 

      BufferedReader bf = new BufferedReader(new InputStreamReader(
        fs.open(getPath))); 
      String line = null; 
      while ((line = bf.readLine()) != null) { 
       StringTokenizer st = new StringTokenizer(line, " \t"); 

       int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id 
       String word = st.nextToken(); // second element is the term 

       // save vocab 
       vocab.put(word, index); 

      } 
     } 

     public void map(Object key, Text value, Context context) 
       throws IOException, InterruptedException { 

      // init TF map 
      Map<String, Integer> mapTF = new HashMap<String, Integer>(); 

      // parse input string 
      StringTokenizer st = new StringTokenizer(value.toString(), " \t"); 

      // first element is doc index 
      int index = Integer.parseInt(st.nextToken()); 

      // count term frequencies 
      String word; 
      while (st.hasMoreTokens()) { 
       word = st.nextToken(); 

       // check if word is in the vocabulary 
       if (vocab.containsKey(word)) { 
        if (mapTF.containsKey(word)) { 
         int count = mapTF.get(word); 
         mapTF.put(word, count + 1); 
        } else { 
         mapTF.put(word, 1); 
        } 
       } 
      } 

      // compute TF-IDF 
      int wordIndex; 
      for (String term : mapTF.keySet()) { 
       int tf = mapTF.get(term); 

       if (vocab.containsKey(term)) { 
        wordIndex = vocab.get(term); 

        context.write(new IntWritable(wordIndex), new Text(index + ":" + tf)); 
       } 

      }    
     } 
    } 

    public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text> 
    { 
     @Override 
     public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
     { 

      StringBuilder sb = new StringBuilder(16000); 

      for (Text value : values) 
      { 
       sb.append(value.toString() + " "); 
      } 


      context.write(key, new Text(sb.toString())); 
     } 
    } 

    /** 
    * This is where the MapReduce job is configured and being launched. 
    */ 
    @Override 
    public int run(String[] arguments) throws Exception { 
     ArgumentParser parser = new ArgumentParser("TextPreprocessor"); 

     parser.addArgument("input", true, true, "specify input directory"); 
     parser.addArgument("output", true, true, "specify output directory"); 

     parser.parseAndCheck(arguments); 

     Path inputPath = new Path(parser.getString("input")); 
     Path outputDir = new Path(parser.getString("output")); 

     // Create configuration. 
     Configuration conf = getConf(); 

     // add distributed file with vocabulary 
     DistributedCache 
       .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf); 

     // Create job. 
     Job job = new Job(conf, "WordCount"); 
     job.setJarByClass(IndexerMapper.class); 

     // Setup MapReduce. 
     job.setMapperClass(IndexerMapper.class); 
     //job.setCombinerClass(IndexerReducer.class); 
     job.setReducerClass(IndexerReducer.class); 

     // Sort the output words in reversed order. 
     job.setSortComparatorClass(Comparator.class); 


     job.setNumReduceTasks(1); 

     // Specify (key, value). 
     job.setMapOutputKeyClass(IntWritable.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(Text.class); 

     // Input. 
     FileInputFormat.addInputPath(job, inputPath); 
     job.setInputFormatClass(TextInputFormat.class); 

     // Output. 
     FileOutputFormat.setOutputPath(job, outputDir); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileSystem hdfs = FileSystem.get(conf); 

     // Delete output directory (if exists). 
     if (hdfs.exists(outputDir)) 
      hdfs.delete(outputDir, true); 

     // Execute the job. 
     return job.waitForCompletion(true) ? 0 : 1; 
    } 
} 
+0

只是为了确认,你在mapper输出中得到了不同的键?你也可以更新示例输出。也可以在wordpad中查看,如果你有分隔符,如果你的行数很大,你可能会忽略这些行。 –

+0

是的,我得到了不同的映射器的键,这是确认...输出格式映射器是关键[TAB]值 – Smajl

+0

你是如何确认你的映射器输出是正确的?你把减速器的数量设置为0吗?另外我认为你需要在比较器中投射物体?只是尝试删除自定义比较器,看看它是否做出任何改变? –

回答

1

尝试这些调试您的问题 -

  • 减速放数为0,看看什么是映射器输出。
  • 尝试使用默认比较器,在比较器中也需要投射对象,否则它们不会产生正确的结果。