2017-05-08 45 views
0

在Java中,我必须使用MapReduce导入一些数据一个tsv文件(大约21 * 10^6行)到HBase表中。
每一行是:
XYZ | XZS   YSY | SDS | XDA | JKX | SDS   0.XXXXXXXXX
的HTable已5列家族:A,B,C,d,E
批量加载到HBase:错误:java.lang.ClassCastException:org.apache.hadoop.io.FloatWritable无法转换为org.apache.hadoop.hbase.Cell

第一该文件的everyline夫妇是我的HBase rowkey。

五个第二组是5列限定符:

  1. YSY | SDS | XDA | JKX | SDS     - >列家族A
  2. YSY | SDS | XDA | JKX         - >列B家
  3. YSY | SDS | XDA             - >列C族
  4. YSY | SDS                 - >列家庭d
  5. YSY                     - >列E组

最后一个是要插入的值一边细胞。 我也要汇总一个Σ所有值与相同的限定符(1或2或3或4或5)(这将是我的减速机的一部分)。

这是我司机

public class Driver { 

    private static final String COLUMN_FAMILY_1 = "A"; 
    private static final String COLUMN_FAMILY_2 = "B"; 
    private static final String COLUMN_FAMILY_3 = "C"; 
    private static final String COLUMN_FAMILY_4 = "D"; 
    private static final String COLUMN_FAMILY_5 = "E"; 
    private static final String TABLENAME = "abe:data"; 
    private static final String DATA_SEPARATOR = "\t"; 


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
     Configuration configuration = HBaseConfiguration.create(); 

     //Configuration Settings about hbase table 
     configuration.set("hbase.table.name", TABLENAME); 
     configuration.set("colomn_family_1", COLUMN_FAMILY_1); 
     configuration.set("colomn_family_2", COLUMN_FAMILY_2); 
     configuration.set("colomn_family_3", COLUMN_FAMILY_3); 
     configuration.set("colomn_family_4", COLUMN_FAMILY_4); 
     configuration.set("colomn_family_5", COLUMN_FAMILY_5); 
     configuration.set("data_separator", DATA_SEPARATOR); 


     if (args.length!= 2){ 
      System.out.println("Usage: "); 
      System.out.println("-\t args[0] -> HDFS input path"); 
      System.err.println("-\r args[1] -> HDFS output path "); 
      System.exit(1); 
     } 

     String inputPath = args[0]; 
     String outputPath = args[1]; 
     Path inputHdfsPath = new Path(inputPath); 
     Path outputHdfsPath = new Path(outputPath); 

     Job job = null; 

     try { 
      job = Job.getInstance(configuration); 
     } catch (IOException e) { 
      System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n"); 
      e.printStackTrace(); 
     } 

     job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation."); 
     job.setJarByClass(Driver.class); 

     //MAPPER 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setMapperClass(MappingClass.class); 
     job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
     job.setMapOutputValueClass(FloatWritable.class); 

     try { 

      FileInputFormat.addInputPath(job, inputHdfsPath); 

     } catch (IllegalArgumentException | IOException e) { 
      System.out.println("Error setting inputPath in FileInputFormat"); 
      e.printStackTrace(); 
     } 

     try { 

      FileSystem.get(configuration).delete(outputHdfsPath, true); 

     } catch (IllegalArgumentException | IOException e) { 
      System.out.println(""); 
      e.printStackTrace(); 
     } 

     //Setting output FileSystem.Path to save HFile to bulkImport 
     FileOutputFormat.setOutputPath(job, outputHdfsPath);   
     FileSystem hdfs; 


     //Deleting output folder if exists 
     try { 

      hdfs = FileSystem.get(configuration); 
      if(hdfs.exists(outputHdfsPath)){ 
       hdfs.delete(outputHdfsPath, true); //Delete existing Directory 
      } 

     } catch (IllegalArgumentException | IOException e) { 
      e.printStackTrace(); 
     } 


     //Variables to access to HBase 
     Connection hbCon = ConnectionFactory.createConnection(configuration); 
     Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME)); 
     RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME)); 
     Admin admin = hbCon.getAdmin(); 
     HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator); 

     // Wait for HFiles creations 
     boolean result = job.waitForCompletion(true); 
     LoadIncrementalHFiles loadFfiles = null; 

     try { 
      loadFfiles = new LoadIncrementalHFiles(configuration); 
     } catch (Exception e) { 
      System.out.println("Error configuring LoadIncrementalHFiles."); 
      e.printStackTrace(); 
     } 

     if (result){ 
      loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator); 
      System.out.println("Bulk Import Completed."); 
     } 
     else { 
      System.out.println("Error in completing job. No bulkimport."); 
     } 

    } 

    } 

映射是:

public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{ 
     private String separator; 


     @Override 
     protected void setup(Context context) throws IOException, InterruptedException { 
      Configuration configuration = context.getConfiguration(); 
      separator = configuration.get("data_separator"); 
     } 

     @Override 
     public void map(LongWritable key,Text line,Context context){ 

      String[] values = line.toString().split(separator); 
      String rowkey = values[0]; 
      String[] allQualifiers = values[1].split("\\|"); 
      String percentage = values[2]; 
      System.out.println(percentage); 

      String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]); 
      String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]); 
      String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]); 
      String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]); 
      String toQ5 = new String(allQualifiers[0]); 


      ImmutableBytesWritable ibw = new ImmutableBytesWritable(); 
      FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage)); 

      ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1))); 

      try { 
       context.write(ibw, valueOut); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 


      ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2))); 

      try { 
       context.write(ibw, valueOut); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 


      ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3))); 

      try { 
       context.write(ibw, valueOut); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 


      ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4))); 

      try { 
       context.write(ibw, valueOut); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 


      ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5))); 

      try { 
       context.write(ibw, valueOut); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 

     } 

    } 

这是我减速

public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> { 
     private String columnFamily_1; 
     private String columnFamily_2; 
     private String columnFamily_3; 
     private String columnFamily_4; 
     private String columnFamily_5; 
     private float sum; 

     @Override 
     protected void setup(Context context) throws IOException, InterruptedException { 
      Configuration configuration = context.getConfiguration(); 

      columnFamily_1 = configuration.get("colomn_family_1"); 
      columnFamily_2 = configuration.get("colomn_family_2"); 
      columnFamily_3 = configuration.get("colomn_family_3"); 
      columnFamily_4 = configuration.get("colomn_family_4"); 
      columnFamily_5 = configuration.get("colomn_family_5"); 
     } 
     @Override 
     public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){ 
      String[] rk_cq = key.toString().split("_"); 
      String rowkey = rk_cq[0]; 
      String cq = rk_cq[1]; 
      String colFamily = this.getFamily(cq);   
      sum = 0; 

      for(FloatWritable fw : values) 
       sum += fw.get(); 

      ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes()); 
      KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());; 


      try { 
       context.write(ibw, kv); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 

     } 

     private String getFamily(String cq){ 
      String cf = new String(); 

      switch (cq.split("\\|").length) { 
      case 1: 
       cf = columnFamily_1; 
       break; 

      case 2: 
       cf = columnFamily_2; 
       break; 

      case 3: 
       cf = columnFamily_3; 
       break; 

      case 4: 
       cf = columnFamily_4; 
       break; 

      case 5: 
       cf = columnFamily_5; 
       break; 

      default: 
       break; 
      } 

      return cf; 
     } 

    } 

现在错误

17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
 at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
 at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
 at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
 at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
 at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
 at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
 at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
 at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
 at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

感谢您的帮助。

回答

0

我修好了。在我忘记的驱动程序中:

job.setReducerClass(ReducingClass.class); 
相关问题