2017-02-24 183 views
0

的映射是从两个地方 1)文章由用户访问阅读文件(按国家排序) 2)国家的统计(国家明智)从HDFS读写HBASE

两个映射器的输出是文本,文本

我从两个不同的组运行亚马逊集群方案

我的目标是读取数据,并在HBase的结合的结果,并将其存储。

HDFS到HDFS正在工作。 的代码卡住减少67%,为

17/02/24 10:45:31 INFO mapreduce.Job: map 0% reduce 0% 
17/02/24 10:45:37 INFO mapreduce.Job: map 100% reduce 0% 
17/02/24 10:45:49 INFO mapreduce.Job: map 100% reduce 67% 
17/02/24 10:46:00 INFO mapreduce.Job: Task Id : attempt_1487926412544_0016_r_000000_0, Status : FAILED 
Error: java.lang.IllegalArgumentException: Row length is 0 
     at org.apache.hadoop.hbase.client.Mutation.checkRow(Mutation.java:565) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:110) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:68) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:58) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:45) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:1) 
     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635) 
     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390) 
     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

Driver类是给出了错误

package com.happiestminds.hadoop; 



import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.MasterNotRunningException; 
import org.apache.hadoop.hbase.client.HBaseAdmin; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class Main extends Configured implements Tool { 

    /** 
    * @param args 
    * @throws Exception 
    */ 
    public static String outputTable = "mapreduceoutput"; 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new Main(), args); 
     System.exit(exitCode); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 


     Configuration config = HBaseConfiguration.create(); 

     try{ 
      HBaseAdmin.checkHBaseAvailable(config); 
     } 
     catch(MasterNotRunningException e){ 
      System.out.println("Master not running"); 
      System.exit(1); 
     } 

     Job job = Job.getInstance(config, "Hbase Test"); 

     job.setJarByClass(Main.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 



     MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ArticleMapper.class); 
     MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, StatisticsMapper.class); 

     TableMapReduceUtil.addDependencyJars(job); 
     TableMapReduceUtil.initTableReducerJob(outputTable, CounterReducer.class, job); 

     //job.setReducerClass(CounterReducer.class); 

     job.setNumReduceTasks(1); 


     return job.waitForCompletion(true) ? 0 : 1; 
    } 

} 

减速类是

package com.happiestminds.hadoop; 

import java.io.IOException; 

import org.apache.hadoop.hbase.client.Mutation; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 


public class CounterReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { 

    public static final byte[] CF = "counter".getBytes(); 
    public static final byte[] COUNT = "combined".getBytes(); 


    @Override 
    protected void reduce(Text key, Iterable<Text> values, 
      Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
      throws IOException, InterruptedException { 

     String vals = values.toString(); 
     int counter = 0; 

     StringBuilder sbr = new StringBuilder(); 
     System.out.println(key.toString()); 
     for (Text val : values) { 
      String stat = val.toString(); 
      if (stat.equals("***")) { 
       counter++; 
      } else { 
       sbr.append(stat + ","); 
      } 

     } 
     sbr.append("Article count : " + counter); 


     Put put = new Put(Bytes.toBytes(key.toString())); 
     put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
     if (counter != 0) { 
      context.write(null, put); 
     } 

    } 



} 

依赖

<dependencies> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.7.3</version> 
     </dependency> 



     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-client</artifactId> 
      <version>1.2.2</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-common</artifactId> 
      <version>1.2.2</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-server</artifactId> 
      <version>1.2.2</version> 
     </dependency> 



    </dependencies> 

回答

0

你可以尝试检查你是否插入任何空值或不?

HBase数据模型不允许长度为零的行键,它应该至少为1个字节。

在执行put命令之前,请检查您的减速器代码,以确定某些值是否填充为空。

1

一个好的做法是在将它们提交到某个地方之前验证您的值。在您的具体情况下,您可以验证您的密钥sbr或将其包装到具有适当通知政策的try-catch部分中。你应该将他们输出到一些日志,如果他们不正确的,并与新的测试用例更新你的单元测试:

try 
{ 
    Put put = new Put(Bytes.toBytes(key.toString())); 
    put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
    if (counter != 0) { 
     context.write(null, put); 
    } 
} 
catch (IllegalArgumentException ex) 
{ 
     System.err.println("Error processing record - Key: "+ key.toString() +", values: " +sbr.ToString()); 
} 
0

你得到的错误是相当不言自明的。 HBase中的行键不能为空(尽管值可以)。

@Override 
protected void reduce(Text key, Iterable<Text> values, 
     Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
     throws IOException, InterruptedException { 
    if (key == null || key.getLength() == 0) { 
     // Log a warning about the empty key. 
     return; 
    } 
    // Rest of your reducer follows. 
} 
+0

现在减速机卡住了100%。 –

1

据节目抛出的异常很显然,密钥长度为0,因此投入的HBase之前,您可以检查是否密钥长度为0或不那么只有你可以投入到HBase的。

更清晰,为什么密钥长度的0不受HBase的

监守HBase的数据模型支持不允许长度为0的行键,它应该是至少1个字节。 0字节的行键被保留用于内部使用(指定空的开始键和结束键)。