2017-09-26 26 views
1

我需要知道我正在使用的输入文件的分区的行索引。我可以通过将行索引连接到数据在原始文件中强制执行此操作,但我宁愿在Hadoop中执行此操作。我在映射器中有这个...在Hadoop中获取输入文件的分区ID

String id = context.getConfiguration().get("mapreduce.task.partition"); 

但是在任何情况下“id”都是0。在“Hadoop:权威指南”中,它提到访问属性,如分区ID“可以通过传递给Mapper或Reducer的所有方法的上下文对象来访问”。从我所知道的来看,它并没有真正涉及如何获取这些信息。

我浏览了Context对象的文档,看起来上面是这样做的方法,脚本也会编译。但是因为每个价值都是0,所以我不确定我是否真的使用了正确的东西,但我无法在网上找到任何可以帮助我们搞清楚的细节。

代码用来测试...

public class Test { 

public static class TestMapper extends Mapper<LongWritable, Text, Text, Text> { 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String id = context.getConfiguration().get("mapreduce.task.partition"); 
     context.write(new Text("Test"), new Text(id + "_" + value.toString())); 
    } 
} 


public static class TestReducer extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 

     for(Text value : values) { 
      context.write(key, value); 
     } 
    } 
} 


public static void main(String[] args) throws Exception { 

    if(args.length != 2) { 
     System.err.println("Usage: Test <input path> <output path>"); 
     System.exit(-1); 
    } 

    Job job = new Job(); 
    job.setJarByClass(Test.class); 
    job.setJobName("Test"); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(TestMapper.class); 
    job.setReducerClass(TestReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

我不清楚关于输入文件分区的行索引究竟是什么意思。你能澄清吗? –

+0

@BinaryNerd我可能是错的,但我认为这将是输入文件的行ID。所以说如果文件中有100行,我希望知道映射器正在工作的当前行是什么(所以从0-99或1-100的数字) – cpd1

回答

1

两个选项:

  1. 使用偏移而不是行号
  2. 轨迹映射器中的行号

对于第一个,关键是LongWritable告诉你正在进行的线的偏移量ssed。除非您的线条长度完全相同,否则您将无法从偏移量计算线条编号,但它确实可以让您确定排序是否有用。

第二个选项是在映射器中跟踪它。你可以更改您的代码是这样的:

public static class TestMapper extends Mapper<LongWritable, Text, Text, Text> { 

    private long currentLineNum = 0; 
    private Text test = new Text("Test"); 

    public void map(LongWritable key, Text value, Context context) 
          throws IOException, InterruptedException { 

     context.write(test, new Text(currentLineNum + "_" + value)); 
     currentLineNum++; 
    } 
} 
+0

嗯。我可以使用偏移量。我看到它是6的倍数,而且行数应该是相同的长度。对于上面提供的内容,它将如何知道当前行是什么?我的假设是,制图人员正在同时工作,所以上面的计数器可能没有合适的行数。例如,如果要完成的第一个映射器是第5行的映射器,那么当前的行号不会是1吗? – cpd1

+0

映射器的每个实例都将按顺序处理文件中的行/分割其工作。如果你有多个mapper运行,他们将分别处理他们自己的分割。没有对发生的文件的并发访问,因此您可以使用上述简单的方法跟踪该行。你需要确保你的输入不会分裂,所以使用gz压缩之类的东西。 –

+0

明白了。非常感谢你。我认为它会是并发的,所以变量会关闭,但只是在一个大的数据集上进行测试,并且完全按照您提到的方式工作。感谢所有的帮助。 – cpd1

0

你也可以代表你的矩阵元组的线路,包括对每一个元组,所以当你的文件正在阅读的行和山坳,你有信息。如果您使用的空间或逗号分隔的文件组成一个二维数组,那么很难弄清楚您在映射器中正在处理的行(行)