2013-07-30 44 views
3

我是Hadoop的新手,但这是我上个月的一个学习项目。Hadoop 1输入文件= 1输出文件,仅限地图

在试图保持这种含糊不清的,是有用的人,让我扔出去的基本目标第一....假设:

  1. 你有一个大的数据集(明显),数以百万计基本的ASCII文本文件。
    • 每个文件都是“记录”。
  2. 的记录被存储在一个目录结构,以确定客户&日期
    • 例如/用户/ hduser /数据/ customer1表/ YYYY-MM-DD,/用户/ hduser /数据/的customer2/YYYY-MM-DD
  3. 你想模仿输入结构的输出结构
    • 例如/用户/ hduser /出/ customer1表/ YYYY-MM-DD,/用户/ hduser /出/的customer2/YYYY-MM-DD

我已经看过多线程:

还有更多..我也一直在阅读汤姆怀特的Hadoop书。我一直在努力学习这一点。而且我经常在新API和旧API之间交换,这增加了尝试学习这一点的困惑。

许多人指出MultipleOutputs(或旧的api版本),但我似乎无法产生我想要的输出 - 例如,MultipleOutputs似乎不接受“/”来创建目录结构写()

需要采取哪些步骤来创建具有所需输出结构的文件? 目前,我有一个WholeFileInputFormat类,以及相关RecordReader具有(NullWritable K,ByteWritable V)对(如果需要的话,可以改变)

我的地图设置:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { 
    private Text filenameKey; 
    private MultipleOutputs<NullWritable, Text> mos; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 
     filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/ 
     mos = new MultipleOutputs(context); 
    } 
} 

还有一个清理()函数调用mos.close()图()功能是目前未知的(我需要帮助这里)

这是足够的信息指出一个新手在答案的方向?我的下一个想法是在每个map()任务中创建一个MultipleOutputs()对象,每个对象都有一个新的baseoutput字符串,但我不确定它是否有效,甚至是正确的操作。

建议将不胜感激,程序中的任何内容都可以改变,除了输入 - 我只是想学习框架 - 但我想尽可能接近这个结果(稍后我可能会考虑将记录结合到更大的文件,但它们已经是每个记录20MB,并且我想确保它在我无法在记事本中读取之前能够正常工作。

编辑:可以通过修改/扩展TextOutputFormat.class?似乎它可能有一些方法可以工作,但我不确定哪些方法我需要重写...

+0

我还没有尝试过,但书“的Hadoop权威指南”说,从最新的API中MultipleOutputs支持使用文件路径分隔符(/)。你是说它不起作用吗? – Rags

+0

@Rags这可能是我执行MultipleOutputs时的一个错误 – Pseudo

回答

5

如果您关闭投机执行,那么我没有什么能够阻止你在你的映射器中手动创建输出文件夹结构/文件,并向它们写入记录(忽略输出上下文/收集器)

例如,扩展片段(setup方法)这(这基本上是什么多个输出是干什么的,但假设推测执行被关闭,以避免在两个地图的任务都试图写入同一个输出文件的文件冲突):

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class MultiOutputsMapper extends 
     Mapper<LongWritable, Text, NullWritable, NullWritable> { 
    protected String filenameKey; 
    private RecordWriter<Text, Text> writer; 
    private Text outputValue; 
    private Text outputKey; 

    @Override 
    protected void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     // operate on the input record 
     // ... 

     // write to output file using writer rather than context 
     writer.write(outputKey, outputValue); 
    } 

    @Override 
    protected void setup(Context context) throws IOException, 
      InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 

     // extract parent folder and filename 
     filenameKey = path.getParent().getName() + "/" + path.getName(); 

     // base output folder 
     final Path baseOutputPath = FileOutputFormat.getOutputPath(context); 
     // output file name 
     final Path outputFilePath = new Path(baseOutputPath, filenameKey); 

     // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder 
     TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() { 
      @Override 
      public Path getDefaultWorkFile(TaskAttemptContext context, 
        String extension) throws IOException { 
       return outputFilePath; 
      } 
     }; 

     // create a record writer that will write to the desired output subfolder 
     writer = tof.getRecordWriter(context); 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, 
      InterruptedException { 
     writer.close(context); 
    } 
} 

考虑几点:

  • customerx/yyyy-MM-dd路径文件或文件夹(如果是文件夹,则需要相应修改 - 此实现假定每个日期有一个文件,文件名为yyyy-MM-dd)
  • 不妨看看LazyOutputFormat防止空输出映射文件被创建
+0

我使用了你的骨架并从中学到了很多东西......并且作为一种学习工具,你的代码非常优秀** ..你是对的,'yyyy- MM-dd'是另一个文件夹,其中有一个文件。带了我一些玩,但得到它的工作,其中一个棘手的位是输入源需要是'/用户/ hduser /数据/ *'(与明星),因为它将任务映射到子目录中的所有文件。我还在作业配置中实现了'NullOutputFormat'(而不是Lazy),并且在设置时使用'TextOutputFormat'(尽管懒惰是一种方便的格式来了解!)非常感谢Chris的指点! – Pseudo

+0

@Chris,只是澄清,这个问题也不能用MultipleOutputs(new API)解决吗? (使用WholeFileInputFormat(自定义类,isSplittable为false,并使用FileSplit的路径)? – Rags

+0

@Rags,可能但我对尝试做同样的事情有一些模糊的记忆,但是在基本输出路径中存在路径分隔符问题。也许这已经在更新的版本中修复了。当然值得一试 –