2013-11-26 44 views
1

我有一个问题。根据最大值计算Hadoop

我想对大数据集进行映射,映射过程取决于最大值。

例如

Input: 
(key) (value)  
-------------- 
key1 1 
key2 2 
key3 5 
key4 6 
key5 9 

我的计算取决于这些值的最大值,以每个点映射。 我想根据值中的最大值将这些值分组。

例如,上一个输入的最大数量是9,我想将它们映射到3个组。我将使用新密钥:(int) value/(Max/3)

output(of mapping) 
(new key) (new Value)  
---------------------- 
0   key1 
0   key2 
1   key3 
1   key4 
2   key5 

而且我有以下映射:

protected void map(Object key, InWritable value, Context context) 
     throws IOException, InterruptedException { 
    int MaximumValue=???; 
    int newKey = (int)value/(MaximumValue/3); 
    context.write(newKey,Key); 
} 

但是,如何计算的最大关键,遍历所有记录过吗?

+0

你不能,你需要在reduce函数中做那个部分。 在缩减器中,当所有映射器值合并在一起时,只有您可以找到最大密钥。 –

+0

谢谢穆克什,但你的意思是我应该两个链接工作?第一个将找到最大值,第二个工作将分组? –

+0

是的。这是正确的。 你能告诉我这行是干什么**(int)值/(最大/ 3)**。 因为我认为这行不会帮你创建3组。 –

回答

1

你可以这样做。

注意:我在说w.r.t Hadoop 1.2.1。您可能需要对较新的API进行一些修改。

在您的驱动程序中,阅读inputpath并解析它并找到最大值。

BufferedReader br = new BufferedReader(new InputStreamReader(
      fs.open(inpath))); 
    String line = ""; 
    line = br.readLine(); 
    int max = Integer.MIN_VALUE; 
    try { 
     while (line != null) { 
      if (line.trim().length() == 0 || line.trim().equals("")) { 
       line = br.readLine(); 
       continue; 
      } 
      String[] parts = line.split(" "); 
      int val = Integer.parseInt(parts[1]); 
      if (val > max) 
       max = val; 
      line = br.readLine(); 
     } 
    } finally { 
     br.close(); 
    } 
} 

将其设置在您的配置中。

conf.setInt("max_val", max); 

并通过覆盖configure()方法在您的映射器中读取它。对于较新的API,我认为你必须重写setup()方法。

@Override 
public void configure(JobConf conf) { 
    max = Integer.parseInt(conf.get("max_val")); 
} 
+0

这不起作用,因为按照您的建议算法,应改变以前记录的关键(即)如果最大值是9,直到有些点改变为21,这意味着我应该改变前一个记录的关键,而不是前一个输出到(0 key1,0 key2,0 key3,0 key4,1 key5),这是无法完成的。 –

+0

我不明白你的评论^ –

+0

对不起,我想知道你的解决方案,我只是明白它,你的意思是检测驱动程序的最大值?但是如果我有很长的记录,这个解决方案也很耗时。我更喜欢使用链式作业。 –