2012-07-12 31 views
4

我试图用hadoop实现一些Map/Reduce作业来完成大学作业。但目前我完全停留在实现自定义FileInputFormat类以从文件中将整个内容导入到映射器中。Hadoop:实现自定义FileInputFormat类时需要的帮助

我把这个例子从“hadoop:权威指南”中拿出来,没有任何改变。我可以编译我的源代码,但如果我运行它,它会抛出此异常(目前我使用的Hadoop 1.0.2在Debian 5.0)

Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>() 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115) 
    at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575) 
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:989) 
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981) 
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) 
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) 
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824) 
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261) 
    at org.myorg.ExampleFileInputFormat.run(ExampleFileInputFormat.java:163) 
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) 
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) 
    at org.myorg.ExampleFileInputFormat.main(ExampleFileInputFormat.java:172) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156) 
Caused by: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>() 
    at java.lang.Class.getConstructor0(Class.java:2706) 
    at java.lang.Class.getDeclaredConstructor(Class.java:1985) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109) 
    ... 21 more 

我有点沮丧,因为我不明白发生了什么,我没有发现任何使用网络搜索。也许你们中的一些人可以看看我的来源。目前为了调试目的,这种方法很简单。


package org.myorg; 
/* 
* 
* 
*/ 
import java.io.IOException; 
import java.util.*; 
import java.io.*; 
import java.util.regex.Pattern; 
import java.util.regex.Matcher; 

import org.apache.hadoop.fs.*; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.util.*; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.conf.Configured; 


public class ExampleFileInputFormat extends Configured implements Tool { 

    /* 
     * <generics> 
     */ 
    public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { 

     @Override 
     protected boolean isSplitable(FileSystem fs, Path filename) { 
      return false; 
     } 

     @Override 
     public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { 
      return new WholeFileRecordReader((FileSplit) split, job); 
     } 
    } 

    public class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> { 

     private FileSplit fileSplit; 
     private Configuration conf; 
     private boolean processed = false; 

     public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException { 
      this.fileSplit = fileSplit; 
      this.conf = conf; 
     } 

     @Override 
     public NullWritable createKey() { 
      return NullWritable.get(); 
     } 

     @Override 
     public BytesWritable createValue() { 
      return new BytesWritable(); 
     } 

     @Override 
     public long getPos() throws IOException { 
      return processed ? fileSplit.getLength() : 0; 
     } 

     @Override 
     public float getProgress() throws IOException { 
      return processed ? 1.0f : 0.0f; 
     } 

     @Override 
     public boolean next(NullWritable key, BytesWritable value) throws IOException { 
      if (!processed) { 
        byte[] contents = new byte[(int) fileSplit.getLength()]; 
        Path file = fileSplit.getPath(); 
        FileSystem fs = file.getFileSystem(conf); 
        FSDataInputStream in = null; 
        try { 
        in = fs.open(file); 
        IOUtils.readFully(in, contents, 0, contents.length); 
        value.set(contents, 0, contents.length); 
        } finally { 
        IOUtils.closeStream(in); 
        } 
        processed = true; 
        return true; 
      } 
      return false; 
     } 

     @Override 
     public void close() throws IOException { 
      // do nothing 
     } 
    } 
     /* </generics> */ 


    /* 
    * <Task1>: 
    * */ 
    public static class ExampleMap extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 
     public void map(NullWritable key, BytesWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
      output.collect(new Text("test"), one); 
     } 
     } 
    public static class ExampleReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     output.collect(key, new IntWritable(sum)); 
     } 
    } 
    /* </Task1> */ 
    /* 
    * <run> 
    **/ 
    public int run(String[] args) throws Exception { 

     if (args.length != 3) { 
      printUsage(); 
      return 1; 
     } 

     String useCase = args[0]; 
     String inputPath = args[1]; 
     String outputPath = args[2]; 

     deleteOldOutput(outputPath);   


     JobConf conf = new JobConf(ExampleFileInputFormat.class); 
     FileOutputFormat.setOutputPath(conf, new Path(outputPath));  
     FileInputFormat.setInputPaths(conf, new Path(inputPath)); 

     /* conf: Task1 */ 
     if (useCase.equals("cc_p")) { 
      conf.setJobName("WordCount"); 
      /* Output: Key:Text -> Value:Integer */ 
      conf.setOutputKeyClass(Text.class); 
      conf.setOutputValueClass(IntWritable.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      /* Input: Key.Text -> Value:Text */ 
      conf.setInputFormat(WholeFileInputFormat.class); 
      conf.setMapperClass(ExampleMap.class); 
      conf.setReducerClass(ExampleReduce.class); 
     } 
     /* default-option: Exit */ 
     else { 
      printUsage(); 
      return 1; 
     } 

     JobClient.runJob(conf); 
     return 0; 
    } 
    /* </run> */ 

    /* 
    * <Main> 
    */ 
    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new ExampleFileInputFormat(), args); 
     System.exit(res); 
    } 
    /* </Main> */ 

    /* 
    * <Helper> 
    */ 
    private void printUsage() { 
     System.out.println("usage: [usecase] [input-path] [output-path]"); 
     return; 
    } 

    private void deleteOldOutput(String outputPath) throws IOException { 
     // Delete the output directory if it exists already 
     Path outputDir = new Path(outputPath); 
     FileSystem.get(getConf()).delete(outputDir, true); 
    } 
    /* </Helper-> */ 
} 

谁能帮助我吗?来自德国

问候, 亚历

回答

4

你需要做的内部类的静态:

public static class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { 

否则javac的generats期望一个父ExampleFileInputFormat类实例在传递一个构造

+0

的确,这很明显。从未想过这件事。非常感谢。你救了我但是;-) – la3mmchen 2012-07-12 19:35:46

相关问题