这是一个相当普遍的问题,我不明白要选什么。Hadoop对值进行二次排序。排序,忽悠价值
我有字段:ID,creationDate,状态,则DateDiff
ID是自然键。
我需要在我减速器获得:
KEY(ID),VALUE(creationDate,状态,则DateDiff)
VALUE(creationDate,状态,则DateDiff)应排序:creationDate ,状态
我应该选择什么钥匙? 我确实创造了复合键(id,creationDate,州)
我没有通过ID实现 分区由ID
石斑鱼由ID
分拣, creationDate,状态
我减速只获得唯一的ID ... 例如:
1 123 true 6
1 456 false 6
1 789 true 7
我只得到
1 123 true 6
在我减速。好像我没有得到分拣,分区,石斑鱼:(还有就是理解luxk
这里是我的代码:
public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{
private static final Log LOG = LogFactory.getLog(POIMapper.class);
@Override
public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
}
}
public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{
private static final Log LOG = LogFactory.getLog(POIReducer.class);
private final Text textForOutput = new Text();
@Override()
public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
throws IOException, InterruptedException {
XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
LOG.info("\nPOIReducer: key:"+key);
for(XVLRValue value : values){
LOG.info("\n --- --- --- value:"+value+"\n");
textForOutput.set(print(key, value));
context.write(key.getMsisdn(), textForOutput);
}
}
private String print(XVLRKey key, XVLRValue value){
StringBuilder builder = new StringBuilder();
builder.append(value.getLac()) .append("\t")
.append(value.getCellId()) .append("\t")
.append(key.getDateOccurrence()) .append("\t")
.append(value.getTimeDelta());
return builder.toString();
}
}
工作代码:
JobBuilder<POIJob> jobBuilder = createTestableJobInstance();
jobBuilder.withOutputKey(XVLRKey.class);
jobBuilder.withOutputValue(XVLRValue.class);
jobBuilder.withMapper(POIMapper.class);
jobBuilder.withReducer(POIReducer.class);
jobBuilder.withInputFormat(TextInputFormat.class);
jobBuilder.withOutputFormat(TextOutputFormat.class);
jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);
boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
MatcherAssert.assertThat(result, Matchers.is(true));
public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {
@Override
public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
}
}
public class XVLRCompositeKeyComparator extends WritableComparator {
protected XVLRCompositeKeyComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.compareTo(key2);
}
}
public class XVLRKeyGroupingComparator extends WritableComparator {
protected XVLRKeyGroupingComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.getMsisdn().compareTo(key2.getMsisdn());
}
}
public class XVLRKey implements WritableComparable<XVLRKey>{
private final LongWritable msisdn;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setters
}
public class XVLRValue implements WritableComparable<XVLRValue> {
private final LongWritable lac;
private final LongWritable cellId;
private final LongWritable timeDelta;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setterrs
}
请查看XVLRKey,XVLRValue确实有重复的字段,我重复了dateOccurrence,在XVLRKey中声明,因为我想在我的reducer中得到排序后的值,应该按dateOccurrence排序。 d一种如何解决这个问题而不重复的方法。
你能发表一些代码吗? –