2015-05-05 28 views
7

我有一个任务,它将avro输出写入由输入记录的几个字段组织的多个目录中。推测性执行Hadoop多个输出

 
For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg: 
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro 
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context); 
.... 
.... 
    multipleOutputs.write("output", avroKey, NullWritable.get(), 
      OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear()); 

哪些输出commiter将下面的代码使用写output.Is它并不安全与推测执行使用吗? 随着推测执行这引起(可能)org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

在这个后 Hadoop Reducer: How can I output to multiple directories using speculative execution? 建议使用自定义输出的提交

在下面的代码从Hadoop的AvroMultipleOutputs没有说明投机性执行

private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, 
      String baseFileName) throws IOException, InterruptedException { 

    writer = 
       ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), 
        taskContext.getConfiguration())).getRecordWriter(taskContext); 
... 
} 

任何问题,无论是否写入方法的文档的任何问题,如果baseoutput路径是工作目录外

public void write(String namedOutput, Object key, Object value, String baseOutputPath) 

当在作业目录外写入时,AvroMultipleOutputs(其他输出)是否存在与推测执行相关的实际问题? 如果,那我怎么超越AvroMultipleOutputs有它自己的输出committer.I看不到里面AvroMultipleOutputs,其输出的提交它采用

+0

您是否编写了自己的实现?我有同样的问题 – tesnik03

+0

W如果你说“通过推测执行,这可能导致(可能会导致)org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException”,你是否看到过这种记录在任何地方,或者你是否从经验中发言。我们看到了相同的行为,但在使用多个输出时未发现任何禁用推测执行的显式引用。 – ioss

+0

是的,它被记录。这里有个警告http://archive.cloudera.com/cdh5/cdh/5/hadoop/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html – bl3e

回答

1

AvroMultipleOutputs将使用OutputFormat您已注册作业配置,同时添加一个名为输出的任何OUTPUTFORMAT例如使用来自AvroMultipleOutputsaddNamedOutput API(例如AvroKeyValueOutputFormat)。

由于AvroMultipleOutputs,您可能无法使用推测任务执行功能。即使压倒一切也无济于事,也不会简单。

相反,你应该写自己OutputFormat(最有可能延长现有的Avro输出格式例如AvroKeyValueOutputFormat一个),并覆盖/实现其getRecordWriter API,它会返回一个RecordWriter例如说MainRecordWriter(仅供参考)。

这个MainRecordWriter将保持RecordWriter(例如AvroKeyValueRecordWriter)实例的地图。这些RecordWriter实例中的每一个都属于输出文件之一。在write API MainRecordWriter中,您将从地图中获得实际的RecordWriter实例(根据您要写入的记录),然后使用该记录写入器写入记录。所以MainRecordWriter只会作为多个RecordWriter实例的封装工作。

对于一些类似的实现,您可能喜欢从piggybank库研究MultiStorage类的代码。

0

当你命名输出添加到AvroMultipleOutputs,它会调用要么AvroKeyOutputFormat.getRecordWriter()AvroKeyValueOutputFormat.getRecordWriter(),其中呼吁AvroOutputFormatBase.getAvroFileOutputStream(),其内容是

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { 
    Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(), 
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT)); 
    return path.getFileSystem(context.getConfiguration()).create(path); 
} 

而且AvroOutputFormatBase延伸FileOutputFormat(以上述方法getOutputCommitter()事实上是一个请致电FileOutputFormat.getOutputCommitter()因此,AvroMultipleOutputs应与MultipleOutputs具有相同的约束条件