2012-09-12 42 views
4

假设我有两个键值数据集 - 数据集A和B,我们称之为它们。我想用集合B中的数据更新集合A中的所有数据,其中两个数据与键匹配。使用Hadoop通过两个数据集进行映射

因为我正在处理大量的数据,所以我使用Hadoop来处理MapReduce。我担心的是,为了在A和B之间进行这种关键匹配,我需要将所有集合A(大量数据)加载到每个映射器实例的内存中。这似乎相当低效。

会不会有推荐的方法来做到这一点,每次都不需要重复加载工作?

一些pseudcode澄清目前我在做什么:

Load in Data Set A # This seems like the expensive step to always be doing 
Foreach key/value in Data Set B: 
    If key is in Data Set A: 
     Update Data Seta A 

回答

3

根据该文件,MapReduce框架包括the following steps

  1. 地图
  2. 排序/分区
  3. 结合(可选)
  4. 减少

您已经描述了一种执行连接的方法:将每个映射器中的所有集合A加载到内存中。你是正确的,这是低效的。

请注意,如果两个集合都按键排序和分区,则可以将大连接分割成任意多个较小的连接。 MapReduce在上面的步骤(2)中通过键对每个映射器的输出进行排序。 Sorted Map输出然后按键分区,以便每个Reducer创建一个分区。对于每个唯一键,Reducer将接收来自Set A和Set B的所有值。

要完成加入,Reducer只需要输出密钥和Set B中的更新值(如果存在);否则,从Set A输出密钥和原始值。要区分来自Set A和Set B的值,请尝试在Mapper的输出值上设置一个标志。

+0

如果我理解正确,你说的Hadoop'-input'应该是-both-设置A和B(带有一个标志来区分它们之间的值),然后在缩小步骤中,将被分组在一起,这样就可以加入它们。那是对的吗? – babonk

+0

对,你需要将A和B都输入到你的映射器中,并确保你输出了一个标记,以便减速器能够区分这两者。我还假设你正在编写你自己的'Map'(实现'mapred.Mapper')和'Reduce'(实现'mapred.Reducer')类。像@ joe-k在一个单独的答案中说,Pig,Hive和Cascading都是合法的解决方案,它将为您生成此代码并在Hadoop上运行。 – Shahin

+1

您可以使用布隆过滤器进行小型优化,如果集合A比集合B大很多,并且存在与集合B不匹配并且集合B不适合内存的记录,那么这将很有用。创建一个MapReduce作业,它在Set B上训练Bloom Filter,而不是在最终的MapReduce Job中执行所有建议的操作,但是在使用Bloom Filter的Mapper过滤器Set A中,这将减少Set A中的溢出记录的数量有匹配的B组记录,因此你的工作可能会运行得更快。 – alexeipab

2

Cloudera的This video tutorial给出了一个关于如何通过MapReduce从大约12分钟开始进行大规模加入的很好的描述。
以下是他为将文件B中的记录连接到关键字K上的文件A中的记录以及伪代码而规定的基本步骤。如果这里有任何不清楚的地方,我建议观看视频,因为他比我更擅长解释视频。

在您的映射:

K from file A: 
    tag K to identify as Primary Key 
    emit <K, value of K> 

K from file B: 
    tag K to identify as Foreign Key 
    emit <K, record> 

写分拣机和石斑鱼将忽略PK/FK标签,让您的记录,无论他们是否是PK记录或FK发送到相同的减速记录并分组在一起。

写一个比较器,比较PK和FK键并首先发送PK。

该步骤的结果将是所有具有相同密钥的记录将被发送到同一个Reducer并处于相同的一组值中以减少。带有PK标记的记录将是第一个,然后是来自需要被连接的B的所有记录。现在,减速机:

value_of_PK = values[0] // First value is the value of your primary key 
for value in values[1:]: 
    value.replace(FK,value_of_PK) // Replace the foreign key with the key's value 
    emit <key, value> 

这样做的结果将是文件B,由K的文件A.值替代k的所有事件还可以扩展,以实现一个完整的内部连接,或把两个文件全部写出来用于直接数据库存储,但一旦你得到这个工作,这些都是非常微不足道的修改。

+0

不错。将它标记为A和B中的哈希值而不是密钥本身不起作用吗?然后,我不需要编写自己的Sorter和Grouper,并且可以在Reduce步骤中循环访问该组,以查看它们是否存在 – babonk

+0

@babonk对不起,不知道你的意思。你是指在散列阶段标记值? – HypnoticSheep

+0

你似乎建议标记K,而不是K的值。我只是说它可能更容易标记值,所以我不需要写我自己的分类器/石斑鱼 – babonk

3

到目前为止发布的所有答案都是正确的 - 这应该是一个Reduce方面的加入...但是没有必要重新发明轮子!你有没有考虑过PigHiveCascading?他们都加入了内置的功能,并且相当优化。

+0

哇,很酷。现在我要坚持使用Hadoop,但我可能会学习这些框架中的一个 – babonk