我正在处理hadoop中输入的日志文件,其中的密钥不均匀分布。这意味着减值者的价值分布不均衡。例如,key1有1个值,key2有1000个值。处理Hadoop中密钥间值的不均匀分布mapreduce
有没有办法做一个相同的键关联的值的负载均衡[我不想改变我的钥匙也如果你知道哪些键将会有一个不同寻常的大量的
我正在处理hadoop中输入的日志文件,其中的密钥不均匀分布。这意味着减值者的价值分布不均衡。例如,key1有1个值,key2有1000个值。处理Hadoop中密钥间值的不均匀分布mapreduce
有没有办法做一个相同的键关联的值的负载均衡[我不想改变我的钥匙也如果你知道哪些键将会有一个不同寻常的大量的
值,你可以使用下面的技巧。
您可以实现自定义Partitioner
这将确保每个歪斜键进入到一个分区,然后一切就由获得分配给其余的分区的hashCode
(这是默认HashPartitioner
一样)。
您可以通过实现此接口创建自定义Partitioner
:
public interface Partitioner<K, V> extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}
然后你就可以告诉Hadoop的使用您的Partitioner
有:
conf.setPartitionerClass(CustomPartitioner.class);
非常感谢@charles。不幸的是,我不知道哪个键会有大量的值。同样在你的解决方案中,这种方法会导致一个特定的reducer [接收1000个值的那个]来处理大量的数据。我担心的原因是因为对于属于特定键的每个值,我都进行了大量计算[可以说某个键将有75000个值,并且我将遍历reducer中的值并进行一些每次需要2分钟的计算] – udag
也许你能击中之前使用组合减速?这是相当推测...
想法是将每组密钥分区成预设最大大小的分区,然后将这些分区的k/v对输出到reducer。这段代码假设你已经在你的配置中设置了这个大小。
public static class myCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<Text> textList = new ArrayList<Text>();
int part = 0;
while (values.iterator().hasNext()) {
if (textList.size() <= Integer.parseInt(context.getConfiguration().get("yourMaxSize"))) {
textList.add(values.iterator().next());
} else {
for(Text t : textList) {
//essentially partitioning each key...
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
textList.clear();
}
part += 1;
}
//output any stragglers ...
for(Text t : textList) {
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
}
}
你能从算法的角度描述你的工作吗 - 一旦他们进入reducer(例如它是一个总和/分钟/最大/平均计算或类似的 - 你可以做什么?部分计算会被移植到组合器中,以减少映射器和缩减器之间数据流的偏斜键?) –