2014-03-07 91 views
1

我目前正在探索布洛姆过滤器。我已经浏览了大部分关于开花装饰者的博客,并且知道在联接的情况下哪些仍然无法找出示例。布洛姆过滤器减少侧加入

每篇文章都说它会减少网络I/O,但没有一个显示如何?尤其是一个很好的http://vanjakom.wordpress.com/tag/distributed-cache/,但它似乎很复杂,因为我刚开始使用地图缩小。

任何一个可以帮助我在下面的例子中实现布隆过滤器(reduceside加入)

2 mapers读取用户记录和部门记录和减速机加入

用户记录

ID,名称

3738,Richie Gore

12946,罗尼山姆

17556,大卫·加特

3443,雷切尔·史密斯

5799,保罗罗斯塔

部门的记录

3738,销售

12946 ,营销

17556,营销

3738,销售

3738,销售

代码

public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{ 

private Text outkey = new Text(); 
private Text outval = new Text(); 
private String id, name; 

public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter) 
      throws IOException { 

    String line = value.toString(); 
    String arryUsers[] = line.split(","); 
    id = arryUsers[0].trim(); 
    name = arryUsers[1].trim(); 

    outkey.set(id); 
    outval.set("A"+ name); 
    ouput.collect(outkey, outval); 
    } 
    } 

public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 

private Text Outk = new Text(); 
private Text Outv = new Text(); 
String depid, dep ; 

public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

    String line = value.toString(); 
    String arryDept[] = line.split(","); 
    depid = arryDept[0].trim(); 
    dep = arryDept[1].trim(); 

    Outk.set(depid); 
    Outv.set("B" + dep); 

    output.collect(Outk, Outv); 
} 
    } 

和减速

ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 

private Text tmp = new Text(); 
private ArrayList<Text> listA = new ArrayList<Text>(); 
private ArrayList<Text> listB = new ArrayList<Text>(); 

public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException { 

    listA.clear(); 
    listB.clear(); 

    while (values.hasNext()) { 

     tmp = values.next(); 
     if (tmp.charAt(0) == 'A') { 
      listA.add(new Text(tmp.toString().substring(1))); 
     } else if (tmp.charAt(0) == 'B') { 
      listB.add(new Text(tmp.toString().substring(1))); 
     } 



    } 
    executejoinlogic(output); 

} 

private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException { 

    if (!listA.isEmpty() && !listB.isEmpty()) { 
     for (Text A : listA) { 
     for (Text B : listB) { 
     output.collect(A, B); 
     } 
     } 
     } 
    } 
      } 

是否有可能在上述场景中实现布隆过滤器?

如果是的话请帮我实施这个?

回答

0

只有当您的两个输入表中的一个比另一个输入表小得多时,才可以在此实现布隆过滤器。你需要在这里遵循的流程是:

  1. 初始化布隆过滤器在映射类的setup()方法(过滤器对象本身应该是全球性的,以便它可以通过map()方法后来被访问):

    filter = new BloomFilter(VECTOR_SIZE,NB_HASH,HASH_TYPE);

  2. 阅读较小的表进Mapper的setup()方法。

  3. 每个记录的ID添加到bloom滤波器:

    filter.add(ID);

  4. map()方法本身,可使用任何的ID filter.membershipTest(ID)从较大的输入源。如果没有匹配,您知道该ID不存在在你的小数据集,因此不应该被传递给减速。

  5. 请记住,你会得到误报在减速,所以不要以为一切都将加盟。
+1

原谅我复活一个老问题,但Ben的#5是不正确,我相信:我认为这是不可能的布隆过滤器的产生假阴性?错误**肯定**,是的,但不是错误的否定 – MattEdge