2013-01-16 154 views
0

这是一个相当普遍的问题,我不明白要选什么。Hadoop对值进行二次排序。排序,忽悠价值

我有字段:ID,creationDate,状态,则DateDiff

ID自然键

我需要在我减速器获得:

KEY(ID),VALUE(creationDate,状态,则DateDiff)

VALUE(creationDate,状态,则DateDiff)应排序:creationDate ,状态

我应该选择什么钥匙? 我确实创造了复合键(id,creationDate,州)

我没有通过ID实现 分区由ID

石斑鱼由ID

分拣, creationDate,状态

我减速只获得唯一的ID ... 例如:

1 123 true 6 
1 456 false 6 
1 789 true 7 

我只得到

1 123 true 6 

在我减速。好像我没有得到分拣,分区,石斑鱼:(还有就是理解luxk

这里是我的代码:

public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{ 

    private static final Log LOG = LogFactory.getLog(POIMapper.class); 

    @Override 
    public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException { 
     Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER); 
     context.write(xvlrPair.getValue0(), xvlrPair.getValue1()); 
    } 

} 

public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{ 

    private static final Log LOG = LogFactory.getLog(POIReducer.class); 

    private final Text textForOutput = new Text(); 

    @Override() 
    public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context) 
                      throws IOException, InterruptedException { 
     XVLROutput out = null; 
//Just check that values are correctly attached to keys. No logic here... 
     LOG.info("\nPOIReducer: key:"+key); 
     for(XVLRValue value : values){ 
      LOG.info("\n --- --- --- value:"+value+"\n"); 
      textForOutput.set(print(key, value)); 
      context.write(key.getMsisdn(), textForOutput); 
     } 
    } 

    private String print(XVLRKey key, XVLRValue value){ 
     StringBuilder builder = new StringBuilder(); 
     builder.append(value.getLac())   .append("\t") 
       .append(value.getCellId())  .append("\t") 
       .append(key.getDateOccurrence()) .append("\t") 
       .append(value.getTimeDelta()); 
     return builder.toString(); 
    } 
} 

工作代码:

JobBuilder<POIJob> jobBuilder = createTestableJobInstance(); 

     jobBuilder.withOutputKey(XVLRKey.class); 
     jobBuilder.withOutputValue(XVLRValue.class); 

     jobBuilder.withMapper(POIMapper.class); 
     jobBuilder.withReducer(POIReducer.class); 

     jobBuilder.withInputFormat(TextInputFormat.class); 
     jobBuilder.withOutputFormat(TextOutputFormat.class); 

     jobBuilder.withPartitioner(XVLRKeyPartitioner.class); 
     jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class); 
     jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class); 

     boolean result = buildSubmitAndWaitForCompletion(jobBuilder); 
     MatcherAssert.assertThat(result, Matchers.is(true)); 




public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> { 

    @Override 
    public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) { 
      return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions; 
    } 
} 

public class XVLRCompositeKeyComparator extends WritableComparator { 

    protected XVLRCompositeKeyComparator() { 
     super(XVLRKey.class, true); 
    } 

    @Override 
    public int compare(WritableComparable writable1, WritableComparable writable2) { 
     XVLRKey key1 = (XVLRKey) writable1; 
     XVLRKey key2 = (XVLRKey) writable2; 
     return key1.compareTo(key2); 
    } 
} 

public class XVLRKeyGroupingComparator extends WritableComparator { 

    protected XVLRKeyGroupingComparator() { 
     super(XVLRKey.class, true); 
    } 

    @Override 
    public int compare(WritableComparable writable1, WritableComparable writable2) { 

     XVLRKey key1 = (XVLRKey) writable1; 
     XVLRKey key2 = (XVLRKey) writable2; 

     return key1.getMsisdn().compareTo(key2.getMsisdn()); 

    } 
} 

public class XVLRKey implements WritableComparable<XVLRKey>{ 

    private final LongWritable msisdn; 
    private final LongWritable dateOccurrence; 
    private final BooleanWritable state; 
//getters-setters 
} 

public class XVLRValue implements WritableComparable<XVLRValue> { 

    private final LongWritable lac; 
    private final LongWritable cellId; 
    private final LongWritable timeDelta; 
    private final LongWritable dateOccurrence; 
    private final BooleanWritable state; 
//getters-setterrs 
} 

请查看XVLRKey,XVLRValue确实有重复的字段,我重复了dateOccurrence,在XVLRKey中声明,因为我想在我的reducer中得到排序后的值,应该按dateOccurrence排序。 d一种如何解决这个问题而不重复的方法。

+0

你能发表一些代码吗? –

回答

0

在次要排序情况下(如你所描述的),当你从迭代器中检索下一个值时,你所修改的键的值发生了变化。

发生这种情况是因为Hadoop框架重用了对象的实例,以尽可能避免对象创建和垃圾回收。

所以当你调用“next()”时,框架也会改变关键实例内部的数据。

所以,如果你移动

LOG.info("\nPOIReducer: key:"+key); 

语句,以便它是里面的for循环,你应该看到所有的按键都通过。

因为这个效果我做我的工作基本上具有以下的“规定”:

的关键仅用于框架的价值观引导到正确的 减速。

这意味着

  1. 一切我可能需要必须存在的价值。
  2. 在减速机中,我只查看数值,而且总是放弃/忽略钥匙。
  3. 用于创建密钥的属性也可以在该值中找到。
+0

好的。所以我确实实现了关键,分类器,分组器,分区器?现在我可以从XVLRValue中删除重复的值,是吗?在调用iterator.next()从Iterable 得到这些值后会更新?现在我的解决方案通过了QA测试(标准具输入,标准输出测试)。但数据重复是不好的。所以我尝试删除它并查看关键值是如何改变的? – Sergey

+0

我无法判断您是否正确实施了一切。我假设你为你的键和值类实现了正确的compareTo方法?如果不是,那么它会逐字节比较。 –