2014-09-19 28 views

回答

0

扩展FileInputFormat在你想要使自定义输入格式的类,如果你的类名是CustomTextInputFormat,并且你的输入数据文件的每一行都有确切的两个值其是昏迷(,)分隔,并且要采取第一个为键和第二个作为然后在下面的值是代码

public class CustomTextInputFormat extends FileInputFormat<Text, Text> { 

@Override 
public RecordReader<Text, Text> createRecordReader(InputSplit input, 
     TaskAttemptContext ctx) throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    CustomTextRecordReader reader=new CustomTextRecordReader(); 
    reader.initialize(input, ctx); 
    return reader; 
} 

} 


import java.io.IOException; 

import org.apache.hadoop.conf.Configuration;<br> 
import org.apache.hadoop.fs.FSDataInputStream;<br> 
import org.apache.hadoop.fs.FileSystem;<br> 
import org.apache.hadoop.fs.Path;<br> 
import org.apache.hadoop.io.Text;<br> 
import org.apache.hadoop.mapreduce.InputSplit;<br> 
import org.apache.hadoop.mapreduce.RecordReader;<br> 
import org.apache.hadoop.mapreduce.TaskAttemptContext;<br> 
import org.apache.hadoop.mapreduce.lib.input.FileSplit;<br> 
import org.apache.hadoop.util.LineReader;<br> 

public class CustomTextRecordReader extends RecordReader<Text, Text> { 

LineReader lineReader; 
Text key; 
Text line=new Text(); 
Text value; 
Configuration conf; 
FileSplit split; 
@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 

    try{ 
     int status=lineReader.readLine(line); 
     if(status<=0){ 
      return false; 
     } 
     String[] str =line.toString().split(","); 
     if(str.length!=2){ 
      throw new Exception("Illegal number of arguments"); 
     } 
     key = new Text(str[0].trim()); 
     value = str[1]; 

    }catch(Exception e){ 
     e.printStackTrace(); 
    } 


    return true; 
} 

@Override 
public void initialize(InputSplit split, TaskAttemptContext ctx) 
     throws IOException, InterruptedException { 
    this.split=(FileSplit)split; 
    // TODO Auto-generated method stub 
    conf = ctx.getConfiguration(); 
    Path path=this.split.getPath(); 
    FileSystem fs=path.getFileSystem(conf); 
    FSDataInputStream in=null; 
    try{ 
     in = fs.open(path); 
     lineReader = new LineReader(in,conf); 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 

} 


@Override 
public void close() throws IOException { 
    // TODO Auto-generated method stub 

} 

@Override 
public Text getCurrentKey() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return key; 
} 

@Override 
public Text getCurrentValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return value; 
} 

@Override 
public float getProgress() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return 0; 
} 

} 

基本上在nextKeyValue mehod主逻辑在其中读取来自线该文件,然后您可以应用您自己的逻辑来创建您所需的密钥和值。 对于下面的详细信息是由framefork运行,让您的键和值,并传递到地图的方法

 
public void run(Context context) throws IOException, InterruptedException { 
    setup(context); 
    while (context.nextKeyValue()) { 
     map(context.getCurrentKey(), context.getCurrentValue(), context); 
    } 
    cleanup(context); 
} 
+0

非常感谢大家指导我解决我的问题。 – 2014-09-25 05:35:58