2014-03-26 26 views
2

我需要实现基于ORCFile I/O格式的自定义I/O格式。我该如何解决它?如何在MapReduce中使用ORCFile输入/输出格式?

具体而言,我需要一种方法在我的源代码(这是一个自定义Pig实现)中包含ORCFile库,并使用ORCFile输出格式来写入数据,然后使用ORCFile输入格式来读回数据。

回答

0

您需要创建InputFormat类的子类(或FileInputFormat,具体取决于文件的性质)。

Just for Google for Hadoop InputFormat,你会发现很多关于如何创建自己的InputFormat类的文章和教程。

+0

可以用ORC文件格式地图,减少产量将提高性能?(只是映射简化工作,没有蜂巢参与) –

0

您可以使用HCatalog库来读取mapreduce中的写入orc文件。

0

只是写了一个示例代码here。希望能帮助到你。 样本映射代码

public static class MyMapper<K extends WritableComparable, V extends Writable> 
extends MapReduceBase implements Mapper<K, OrcStruct, Text, IntWritable> { 

    private StructObjectInspector oip; 
    private final OrcSerde serde = new OrcSerde(); 

    public void configure(JobConf job) { 
     Properties table = new Properties(); 
     table.setProperty("columns", "a,b,c"); 
     table.setProperty("columns.types", "int,string,struct<d:int,e:string>"); 

     serde.initialize(job, table); 

     try { 
      oip = (StructObjectInspector) serde.getObjectInspector(); 
     } catch (SerDeException e) { 
      e.printStackTrace(); 
     } 
    } 
    public void map(K key, OrcStruct val, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
        throws IOException { 
     System.out.println(val); 
     List<? extends StructField> fields =oip.getAllStructFieldRefs(); 

     StringObjectInspector bInspector = 
       (StringObjectInspector) fields.get(B_ID).getFieldObjectInspector(); 
     String b = "garbage"; 
     try { 
      b = bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID))); 
     } catch (SerDeException e1) { 
      e1.printStackTrace(); 
     } 



     OrcStruct struct = null; 
     try { 
      struct = (OrcStruct) oip.getStructFieldData(serde.deserialize(val),fields.get(C_ID)); 
     } catch (SerDeException e1) { 
      e1.printStackTrace(); 
     } 
     StructObjectInspector cInspector = (StructObjectInspector) fields.get(C_ID).getFieldObjectInspector(); 
     int d = ((IntWritable) cInspector.getStructFieldData(struct, fields.get(D_ID))).get(); 
     String e = cInspector.getStructFieldData(struct, fields.get(E_ID)).toString(); 

     output.collect(new Text(b), new IntWritable(1)); 
     output.collect(new Text(e), new IntWritable(1)); 
    } 


} 

启动代码

JobConf job = new JobConf(new Configuration(), OrcReader.class); 

    // Specify various job-specific parameters  
    job.setJobName("myjob"); 
    job.set("mapreduce.framework.name","local"); 
    job.set("fs.default.name","file:///"); 
    job.set("log4j.logger.org.apache.hadoop","INFO"); 
    job.set("log4j.logger.org.apache.hadoop","INFO"); 

    //push down projection columns 
    job.set("hive.io.file.readcolumn.ids","1,2"); 
    job.set("hive.io.file.read.all.columns","false"); 
    job.set("hive.io.file.readcolumn.names","b,c"); 

    FileInputFormat.setInputPaths(job, new Path("./src/main/resources/000000_0.orc")); 
    FileOutputFormat.setOutputPath(job, new Path("./target/out1")); 

    job.setMapperClass(OrcReader.MyMapper.class); 
    job.setCombinerClass(OrcReader.MyReducer.class); 
    job.setReducerClass(OrcReader.MyReducer.class); 


    job.setInputFormat(OrcInputFormat.class); 
    job.setOutputFormat(TextOutputFormat.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 

    JobClient.runJob(job);