0

我有一组文件说10个文件和一个大文件,这是所有10个文件的总和。阅读许多文件hadoop mapreduce分布式缓存

我把它们放在分布式缓存中,作业conf。

当我看到他们在减少,我遵守以下的事情:

  1. 我读这是在分布式缓存添加在减少方法只选定的文件。我预计速度会更快,因为在每个缩减中读取的文件大小与在所有缩小方法中读取大型文件相比较小。但是,速度较慢。

  2. 此外,当我将它分割成更小的文件并将它们添加到分布式缓存时,问题变得更糟。工作本身在很长一段时间才开始运行。

我无法找到原因。请帮助。

回答

3

我认为你的问题在于阅读reduce()中的文件。您应该阅读configure()(使用旧API)或setup()(使用新API)中的文件。因此,对于每一个减速将只读取一次,而不是读它为每个输入组到减速机(基本上,每次调用减少方法)

您可以编写类似: 使用新的MapReduce API(ORG .apache.hadoop.mapreduce *) -

public static class ReduceJob extends Reducer<Text, Text, Text, Text> { 

    ... 
Path file1; 
Path file2; 
... 

    @Override 
      protected void setup(Context context) throws IOException, InterruptedException { 

       // Get the file from distributed cached 
    file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0]; 
    file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1]; 

       // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap. 
      } 



      @Override 
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
        InterruptedException { 
    ... 
    } 
    } 

使用旧的mapred API(org.apache.hadoop.mapred *) -

public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 

    ... 
Path file1; 
Path file2; 
... 

     @Override 
     public void configure(JobConf job) { 

       // Get the file from distributed cached 
    file1 = DistributedCache.getLocalCacheFiles(job)[0] 
    file2 = DistributedCache.getLocalCacheFiles(job)[1] 
... 

       // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap. 
      } 


@Override 
     public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, 
       Reporter reporter) throws IOException { 
    ... 
    } 
    }