关于第一个问题:
你或许应该整条生产线传递给映射器和只保留第三令牌映射和地图(user
,1)每次。
public class AnalyzeLogs
{
public static class FindFriendMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
String tempStrings[] = value.toString().split(",");
context.write(new Text(tempStrings[2]), new IntWritable(1));
}
}
对于第二个问题,我相信你不能避免在第二个MR Job之后(我想不出任何其他方式)。所以第一份工作的缩减器只会聚合这些值并给出每个键的总和,按键排序。这还不是你需要的。
因此,您将此作业的输出作为输入传递给第二个MR作业。这项工作的目标是在传递给reducer之前按价值做一些特殊的分类(这绝对不会)。
我们的映射器的第二件事会有如下:
public static class SortLogsMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
context.write(value, new NullWritable());
}
正如你所看到的,我们不为这个映射器使用价值可言。相反,我们创建了一个密钥,包含我们的值(我们的密钥是key1 value1
格式)。 现在还有什么要做,是要指定框架,它应该基于value1
而不是整个key1 value1
进行排序。因此,我们将实现一个自定义的SortComparator
:
public static class LogDescComparator extends WritableComparator
{
protected LogDescComparator()
{
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split(" "); //probably it's a " "
String[] t2Items = t2.toString().split(" ");
String t1Value = t1Items[1];
String t2Value = t2Items[1];
int comp = t2Value.compareTo(t1Value); // We compare using "real" value part of our synthetic key in Descending order
return comp;
}
}
您可以设置自定义的比较如下:job.setSortComparatorClass(LogDescComparator.class);
作业的减速应该什么都不做。但是,如果我们不设置缩减器,则映射器键的排序将不会完成(我们需要这样做)。因此,您需要将IdentityReducer
设置为第二个MR作业的Reducer,以便不减少但仍然确保映射器的合成键按照我们指定的方式排序。
非常感谢您的详细解答。这真的很有帮助! – Searene
@MarkZar不客气:) –