0
如何实现自定义输入格式(记录阅读器)。当键是一个文本和值是文本(段落)在hadoop中自定义输入格式并缩小地图
如何实现自定义输入格式(记录阅读器)。当键是一个文本和值是文本(段落)在hadoop中自定义输入格式并缩小地图
扩展FileInputFormat在你想要使自定义输入格式的类,如果你的类名是CustomTextInputFormat,并且你的输入数据文件的每一行都有确切的两个值其是昏迷(,)分隔,并且要采取第一个为键和第二个作为然后在下面的值是代码
public class CustomTextInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit input,
TaskAttemptContext ctx) throws IOException, InterruptedException {
// TODO Auto-generated method stub
CustomTextRecordReader reader=new CustomTextRecordReader();
reader.initialize(input, ctx);
return reader;
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;<br>
import org.apache.hadoop.fs.FSDataInputStream;<br>
import org.apache.hadoop.fs.FileSystem;<br>
import org.apache.hadoop.fs.Path;<br>
import org.apache.hadoop.io.Text;<br>
import org.apache.hadoop.mapreduce.InputSplit;<br>
import org.apache.hadoop.mapreduce.RecordReader;<br>
import org.apache.hadoop.mapreduce.TaskAttemptContext;<br>
import org.apache.hadoop.mapreduce.lib.input.FileSplit;<br>
import org.apache.hadoop.util.LineReader;<br>
public class CustomTextRecordReader extends RecordReader<Text, Text> {
LineReader lineReader;
Text key;
Text line=new Text();
Text value;
Configuration conf;
FileSplit split;
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
try{
int status=lineReader.readLine(line);
if(status<=0){
return false;
}
String[] str =line.toString().split(",");
if(str.length!=2){
throw new Exception("Illegal number of arguments");
}
key = new Text(str[0].trim());
value = str[1];
}catch(Exception e){
e.printStackTrace();
}
return true;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext ctx)
throws IOException, InterruptedException {
this.split=(FileSplit)split;
// TODO Auto-generated method stub
conf = ctx.getConfiguration();
Path path=this.split.getPath();
FileSystem fs=path.getFileSystem(conf);
FSDataInputStream in=null;
try{
in = fs.open(path);
lineReader = new LineReader(in,conf);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
}
基本上在nextKeyValue mehod主逻辑在其中读取来自线该文件,然后您可以应用您自己的逻辑来创建您所需的密钥和值。 对于下面的详细信息是由framefork运行,让您的键和值,并传递到地图的方法
public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }
非常感谢大家指导我解决我的问题。 – 2014-09-25 05:35:58
询问时,请提及你尝试过什么映射器的运行方法的代码,你预计什么发生并发生了什么。 – mechalynx 2014-09-30 03:07:52