2014-11-04 45 views
2

我试图通过每个单词反转文件的内容。我有计划优良运行,但我得到的输出是这样的如何格式化Mapreduce在Hadoop中写入的输出

1 dwp 
2 seviG 
3 eht 
4 tnerruc 
5 gnikdrow 
6 yrotcerid 
7 ridkm 
8 desU 
9 ot 
10 etaerc 

我所要的输出是这样的

dwp seviG eht tnerruc gnikdrow yrotcerid ridkm desU 
ot etaerc 

的代码我与

import java.io.IOException; 
    import java.util.*; 

    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.conf.*; 
    import org.apache.hadoop.io.*; 
    import org.apache.hadoop.mapred.*; 
    import org.apache.hadoop.util.*; 

    public class Reproduce { 

    public static int temp =0; 
    public static class ReproduceMap extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{ 
     private Text word = new Text(); 
     @Override 
     public void map(LongWritable arg0, Text value, 
       OutputCollector<IntWritable, Text> output, Reporter reporter) 
       throws IOException { 
      String line = value.toString().concat("\n"); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()) { 
       word.set(new StringBuffer(tokenizer.nextToken()).reverse().toString()); 
       temp++; 
       output.collect(new IntWritable(temp),word); 
       } 

     } 

    } 

    public static class ReproduceReduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text>{ 

     @Override 
     public void reduce(IntWritable arg0, Iterator<Text> arg1, 
       OutputCollector<IntWritable, Text> arg2, Reporter arg3) 
       throws IOException { 
      String word = arg1.next().toString(); 
      Text word1 = new Text(); 
      word1.set(word); 
      arg2.collect(arg0, word1); 

     } 

    } 

    public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(WordCount.class); 
    conf.setJobName("wordcount"); 

    conf.setOutputKeyClass(IntWritable.class); 
    conf.setOutputValueClass(Text.class); 

    conf.setMapperClass(ReproduceMap.class); 
    conf.setReducerClass(ReproduceReduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 

    } 
} 
工作

如何修改我的输出而不是编写另一个java程序来做到这一点

谢谢s

回答

4

下面是一个简单的代码演示了

public class MyTextOutputFormat extends FileOutputFormat<Text, List<IntWritable>> { 
     @Override 
     public org.apache.hadoop.mapreduce.RecordWriter<Text, List<Intwritable>> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException { 
     //get the current path 
     Path path = FileOutputFormat.getOutputPath(arg0); 
     //create the full path with the output directory plus our filename 
     Path fullPath = new Path(path, "result.txt"); 
    //create the file in the file system 
    FileSystem fs = path.getFileSystem(arg0.getConfiguration()); 
    FSDataOutputStream fileOut = fs.create(fullPath, arg0); 

    //create our record writer with the new file 
    return new MyCustomRecordWriter(fileOut); 
    } 
} 

public class MyCustomRecordWriter extends RecordWriter<Text, List<IntWritable>> { 
    private DataOutputStream out; 

    public MyCustomRecordWriter(DataOutputStream stream) { 
     out = stream; 
     try { 
      out.writeBytes("results:\r\n"); 
     } 
     catch (Exception ex) { 
     } 
    } 

    @Override 
    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { 
     //close our file 
     out.close(); 
    } 

    @Override 
    public void write(Text arg0, List arg1) throws IOException, InterruptedException { 
     //write out our key 
     out.writeBytes(arg0.toString() + ": "); 
     //loop through all values associated with our key and write them with commas between 
     for (int i=0; i<arg1.size(); i++) { 
      if (i>0) 
       out.writeBytes(","); 
      out.writeBytes(String.valueOf(arg1.get(i))); 
     } 
     out.writeBytes("\r\n"); 
    } 
} 

最后,我们需要告诉我们的工作,我们的输出中的格式和运行它之前的路径使用定制FileoutputFormat的。

job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(ArrayList.class); 
job.setOutputFormatClass(MyTextOutputFormat.class); 
FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/out")); 
0

您可以使用NullWritable作为输出值。 NullWritable只是一个占位符因为你不想让数字显示为输出的一部分。我给了修改的减速器类。注: - 需要添加import语句NullWritable

public static class ReproduceReduce extends MapReduceBase implements Reducer<IntWritable, Text, Text, NullWritable>{ 

      @Override 
      public void reduce(IntWritable arg0, Iterator<Text> arg1, 
        OutputCollector<Text, NullWritable> arg2, Reporter arg3) 
        throws IOException { 
       String word = arg1.next().toString(); 
       Text word1 = new Text(); 
       word1.set(word); 
       arg2.collect(word1, new NullWritable()); 

      } 

     } 

,并更改驱动程序类或主要方法

conf.setOutputKeyClass(Text.class); 
conf.setOutputValueClass(NullWritable.class); 
0

在映射键温度递增每个字价值,因此,每个字就作为一个单独的键值对。

下面的步骤应该可以解决问题 1)在Mapper中,只需删除temp ++,以便所有相反的单词都将关键字设置为0(temp = 0)。

2)Reducer接收关键字0和反转字符串列表。 在reducer中将键设置为NullWritable并写入输出。

1

我们可以通过编写自定义fileoutputformat类

0

什么,你可以尝试是取一个恒定的键(或简称nullwritable)自定义输出,并通过这个作为重点,你的整条生产线的值(你可以扭转它在mapper类中,或者你也可以在reducer类中反转它)。所以你的减速器将会收到一个恒定的键(或者如果你已经使用了nullwritable作为键,那么它就是占位符)和完整的行。现在,您可以简单地将该行翻转并将其写入输出文件。通过不使用tmp作为密钥,可以避免在输出文件中写入不需要的数字。