2014-07-01 51 views
3

在我的HDFS上,我有一堆gzip文件,我想将其解压缩为正常格式。有这样的API吗?或者我怎么写一个函数来做到这一点?解压Hadoop hdfs目录中的所有Gzip文件

我不想使用任何命令行工具;相反,我想通过编写Java代码来完成这项任务。

回答

5

您需要一个CompressionCodec来解压缩文件。 gzip的实现是GzipCodec。您通过编解码器获得CompressedInputStream,并通过简单IO获得结果。像这样的东西:说你有一个文件file.gz

//path of file 
String uri = "/uri/to/file.gz"; 
Configuration conf = new Configuration(); 
FileSystem fs = FileSystem.get(URI.create(uri), conf); 
Path inputPath = new Path(uri); 

CompressionCodecFactory factory = new CompressionCodecFactory(conf); 
// the correct codec will be discovered by the extension of the file 
CompressionCodec codec = factory.getCodec(inputPath); 

if (codec == null) { 
    System.err.println("No codec found for " + uri); 
    System.exit(1); 
} 

// remove the .gz extension 
String outputUri = 
    CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); 

InputStream is = codec.createInputStream(fs.open(inputPath)); 
OutputStream out = fs.create(new Path(outputUri)); 
IOUtils.copyBytes(is, out, conf); 

// close streams 

UPDATE

如果你需要得到所有目录中的文件时,你应该得到的FileStatus ES样

FileSystem fs = FileSystem.get(new Configuration()); 
FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir")); 

然后只是循环

for (FileStatus status: statuses) { 
    CompressionCodec codec = factory.getCodec(status.getPath()); 
    ... 
    InputStream is = codec.createInputStream(fs.open(status.getPath()); 
    ... 
} 
+0

所以实际上我所有的gziped的文件存储在hdfs的目录中,并且一个目录内有一堆文件。我想通过解压缩目录中的每个文件,然后将结果文件存储在新目录中。要获取目录中的文件列表,我使用代码List )FileUtils.listFiles(temporaryDirectory,null,true); 那么上面的代码如何改变以匹配这个。对不起,但我对这一切感到困惑,这就是为什么我问。谢谢您的帮助。 – user3690321

+0

查看我的**更新** –

+0

所以我的OutputStream仍然是: OutputStream out = fs.create(new Path(“hdfs/ouput”​​)); 是否正确?所以这将继续将我的InputStream文件复制到OuputStream路径?或者我错了? – user3690321

1

我使用标识映射Hadoop的工作,我在滚烫写道改变压缩/分的变化大小/等

class IdentityMap(args: Args) extends ConfiguredJob(args) { 
    CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity) 
    .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out"))) 
} 

常规配置抽象类:

abstract class ConfiguredJob(args: Args) extends Job(args) { 
    override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = { 
    val Megabyte = 1024 * 1024 
    val conf = super.config(mode) 
    val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte 
    val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte 
    val jobPriority = args.getOrElse("jobPriority","NORMAL") 
    val maxHeap = args.getOrElse("maxHeap","512m") 
    conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap), 
     "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"), 
     "mapred.min.split.size" -> splitSizeMin.toString, 
     "mapred.max.split.size" -> splitSizeMax.toString, 
//  "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag 
     "mapred.job.priority" -> jobPriority) 
    } 
} 
相关问题