2013-02-14 132 views
1

我正在用Java写一个归档程序。将被归档的文件已经驻留在HDFS中。我需要能够将文件从HDFS中的一个位置移动到另一个位置,最后的文件使用Gzip进行压缩。要移动的文件可能非常大,因此使用HDFS API移动它们并对它们进行压缩可能效率很低。所以我想我可以在我的代码中写一个mapreduce作业来为我做这件事。使用MapReduce API使用Gzip压缩在HDFS中复制文件

但是,我一直无法找到任何示例,告诉我如何使用MapReduce API复制这些文件,并使它们以gzip格式输出。实际上,我甚至努力寻找一个程序化的例子来说明如何通过mapreduce复制HDFS中的文件。

任何人都可以介绍一下如何使用MapReduce API完成此任务吗?

编辑:这里是工作的配置代码,我到目前为止,这是改编自帮助,阿马尔给我:

 conf.setBoolean("mapred.output.compress", true); 
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec"); 
     Job job = new Job(conf); 
     job.setJarByClass(LogArchiver.class); 
     job.setJobName("ArchiveMover_"+dbname); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     //job.setMapperClass(IdentityMapper.class); 
     //job.setReducerClass(IdentityReducer.class); 
     job.setInputFormatClass(NonSplittableTextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.setInputPaths(job, new Path(archiveStaging+"/"+dbname+"/*/*")); 
     FileOutputFormat.setOutputPath(job, new Path(archiveRoot+"/"+dbname)); 
     job.submit(); 

这里是类声明NonSplittableTextInputFormat这是LogArchiver类

public class NonSplittableTextInputFormat extends TextInputFormat { 
    public NonSplittableTextInputFormat() { 
    } 

    @Override 
    protected boolean isSplitable(JobContext context, Path file) { 
     return false; 
    } 
} 
+0

不知道这是否可以帮助,但你有没有看过Hadoop Streaming? http://wiki.apache.org/hadoop/HadoopStreaming – AlexIIP 2013-02-14 17:32:31

+0

它可能很好,我需要诉诸于此,但我更喜欢通过MapReduce API来做到这一点。 – 2013-02-14 18:04:44

回答

0

你可以写与IdentityMapperIdentityReducer一个custom jar implementation。 除了纯文本文件,您可以生成gzip文件作为输出。设置在run()以下配置:

conf.setBoolean("mapred.output.compress", true); 
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec"); 

为了确保数量在输入和输出文件都相同,只是输出文件必须用gzip压缩,你必须做两件事情:

  1. 实现不可分割文本输入格式
  2. 将减少任务设置为零。

为了确保每个映射器,一个读取文件时,你可能会延长TextInputFormat如下:

import org.apache.hadoop.fs.*; 
import org.apache.hadoop.mapred.TextInputFormat; 
public class NonSplittableTextInputFormat extends TextInputFormat { 
    @Override 
    protected boolean isSplitable(FileSystem fs, Path file) { 
     return false; 
    } 
} 

,并使用上面的实现为:

job.setInputFormatClass(NonSplittableTextInputFormat.class); 

设置减少任务为零,请执行以下操作:

job.setNumReduceTasks(0); 

这会为你完成这项工作,但最后一件事是文件名不会相同!但我也相信,这里也必须有一个解决方法。

+0

太好了。我会试试这个。再次感谢! – 2013-02-14 18:46:50

+0

看看更新的答案是否有帮助。 – Amar 2013-02-14 18:48:40

+0

你确实让我走向了我的目标。我现在只有两个问题。1)假设我有这样的目录结构: top_level/second_level/third_level/file, top_level/second_level/third_level2/file 我怎样才能使它与输入的目录结构匹配?现在它只是将所有输出文件放在一个目录中。 2)输出文件的每行似乎都有一个数字。它随着每一行递增。几乎看起来像一个字符数。我如何确保该号码不会输出? – 2013-02-14 21:09:47