2016-03-16 88 views
0

我试图自定义批量加载map-reduce进入HBase,并遇到reducer问题。首先,我认为我没有很好地编写reducer,但是当在reducer中抛出运行时异常并看到代码正常工作时,我意识到reducer没有运行。 到目前为止,对于这个问题的一些常见问题,我没有看到任何问题;Hadoop mapreduce - reducer未运行

  1. 我的配置已经mapoutput和输出单独
  2. 我减速器和映射器已覆盖
  3. 我可迭代的,我减速器输入(可写,放),所以...

这里是我的代码:

驱动

public int run(String[] args) throws Exception { 
    int result=0; 
    String outputPath = args[1]; 
    Configuration configuration = getConf(); 
    configuration.set("data.seperator", DATA_SEPERATOR); 
    configuration.set("hbase.table.name",TABLE_NAME); 
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1); 
    Job job = new Job(configuration); 
    job.setJarByClass(HBaseBulkLoadDriver.class); 
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapperClass(HBaseBulkLoadMapper.class); 
    job.setReducerClass(HBaseBulkLoadReducer.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
    job.setOutputValueClass(Put.class); 
    FileInputFormat.addInputPaths(job, args[0]); 
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true); 
    FileOutputFormat.setOutputPath(job, new Path(outputPath)); 
    job.setMapOutputValueClass(Put.class); 
    job.setNumReduceTasks(1); 
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME)); 
    job.waitForCompletion(true); 

映射

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 
    private String hbaseTable; 
    private String dataSeperator; 
    private String columnFamily1; 
    private ImmutableBytesWritable hbaseTableName; 

    public void setup(Context context) { 
     Configuration configuration = context.getConfiguration(); 
     hbaseTable = configuration.get("hbase.table.name"); 
     dataSeperator = configuration.get("data.seperator"); 
     columnFamily1 = configuration.get("COLUMN_FAMILY_1"); 
     hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable)); 
    } 
     @Override 
    public void map(LongWritable key, Text value, Context context) { 
     try { 
      String[] values = value.toString().split(dataSeperator); 
      String rowKey = values[0]; 
      Put put = new Put(Bytes.toBytes(rowKey)); 
      BUNCH OF ADDS; 
      context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put); 
     } catch(Exception exception) { 
      exception.printStackTrace(); 
     } 
    } 
} 

减速

public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> { 
     @Override 
     protected void reduce(
      ImmutableBytesWritable row, 
      Iterable<Put> puts, 
      Reducer<ImmutableBytesWritable, Put, 
        ImmutableBytesWritable, Put>.Context context) 
      throws java.io.IOException, InterruptedException 
     { 
     TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>(); 
     int count =0; 
     Append nkv; 
     byte[] tmp= "".getBytes(); 
     Put pp = new Put(tmp); 
    try{ 
     for (Put p : puts) { 
       byte[] r = "".getBytes(); 
       //KeyValue kv = new KeyValue(r); 
       if (count!=0){ 
       r = p.getRow(); 
       pp.add(new KeyValue(r)); 
       //KeyValue k = map.get(row.toString()); 
       //nkv = new Append(k.getRowArray()); 
       //nkv=nkv.add(kv); 
       //map.put(row.toString(), k.clone()); 
       //context.write(row,nkv); 
       //tmp=ArrayUtils.addAll(tmp,kv.getValueArray()); 
       //map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp)); 
       count++; 
       throw new RuntimeException(); 
       } 
       else{ 
       r = p.getRow(); 
       pp = new Put(row.toString().getBytes()); 
       pp.add(new KeyValue(r)); 
       //tmp=kv.clone().getValueArray(); 
       //nkv = new Append(kv.getRowArray()); 
       //map.put(row.toString(), kv.clone()); 
       count++; 
       throw new RuntimeException(); 
      } 
    } 
     context.write(row,pp); 
     }catch(Exception e) { e.printStackTrace();} 
    } 

} 

嗯,我知道减速机是有点混乱,但事实是,它的RuntimeException两个if和else子句,你可以看到和批量加载成功,所以我很确定减速机没有运转 - 我不知道为什么。所有三个文件都是maven打包在同一个目录下,FYI。

+0

为什么你含蓄'抛出新的RuntimeException();'? –

+0

他试图查看块是否执行......“但是当在reducer中抛出运行异常并看到代码正常工作时,我意识到reducer没有运行” – Tgsmith61591

+0

我认为reducer应该基于'job .setNumReduceTasks(1)'但如果'Iterable puts'为空,那么reducer的for循环将不会被输入,并且这些异常将不会被抛出 –

回答

0

找出错在哪里。 configureincrementalload根据输出值将reducer类设置为putsort或keyvaluesort,所以如果我想使用自定义reducer类,则必须在configureincrementalload之后设置它。之后,我可以看到减速器在运转。只是回答我自己的问题,以便它可以帮助遇到同样问题的人。

HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME)); 
job.setReducerClass(HBaseBulkLoadReducer.class); 
job.waitForCompletion(true);