2017-02-25 50 views
1

我正在编写一个mapreduce程序,其中reduce函数接收作为输入值的PageRankNode(具有两个字段)对象的迭代并将它添加到优先级队列中。在遍历每个对象并将其添加到优先级队列时,结果优先级队列只包含我添加的最后一个对象。 但是,当我创建一个相同类型的新对象并添加到优先级队列中时,它似乎能够按预期工作。优先级队列添加新对象vs添加已创建

我想知道为什么会发生这种情况? 下面的示例作品。然而,而不是“topPages.add(新PageRankNode(pageNode.pageName,pageNode.pageRank))”,我使用“topPages.add(pageNode)”它不按预期工作。

下面还添加了优先级队列的比较器实现。

private Comparator<PageRankNode> comparator= new PageNodeComparator(); 
    private PriorityQueue<PageRankNode> topPages= new PriorityQueue<PageRankNode>(100,comparator); 

public void reduce(NullWritable key,Iterable<PageRankNode> pageNodes,Context context) throws IOException,InterruptedException{ 
    for(PageRankNode pageNode:pageNodes){ 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName,pageNode.pageRank)); 
     if(topPages.size()>100){ 
      topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()){ 
     pageNode=topPages.poll(); 
     context.write(NullWritable.get(),new Text(pageNode.pageName+":"+pageNode.pageRank)); 
    } 

} 
public class PageNodeComparator implements Comparator<PageRankNode>{ 

    public int compare(PageRankNode x,PageRankNode y){ 
     if(x.pageRank < y.pageRank){ 
      return -1; 
     } 
     if(x.pageRank > y.pageRank){ 
      return 1; 
     } 
     return 0; 
    } 
} 

回答

1

我不认为你提供足够的信息来正确诊断此。我发现reduce方法中有InterruptedException这个方法表明你可能在多线程上运行这个函数 - 如果是这样的话,可能是潜在的原因。

我写了一个小程序,其功能相同,输出如预期。

import java.util.Arrays; 
import java.util.Comparator; 
import java.util.PriorityQueue; 

public class Main { 
    private static Comparator<PageRankNode> comparator = new PageNodeComparator(); 
    private static PriorityQueue<PageRankNode> topPages = new PriorityQueue<PageRankNode>(100, comparator); 

    public static void main(String[] args) { 
    reduce(Arrays.asList(
     new PageRankNode("A", 1000), 
     new PageRankNode("B", 1500), 
     new PageRankNode("C", 500), 
     new PageRankNode("D", 700), 
     new PageRankNode("E", 7000), 
     new PageRankNode("F", 60) 
    )); 
    } 

    public static void reduce(Iterable<PageRankNode> pageNodes) { 
    for(PageRankNode pageNode : pageNodes) { 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName, pageNode.pageRank)); 
     if(topPages.size() > 100) { 
     topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()) { 
     pageNode = topPages.poll(); 
     System.out.println(pageNode.pageName); 
    } 
    } 

    public static class PageRankNode { 
    private String pageName; 
    private int pageRank; 

    public PageRankNode(String pageName, int pageRank) { 
     this.pageName = pageName; 
     this.pageRank = pageRank; 
    } 
    } 

    public static class PageNodeComparator implements Comparator<PageRankNode> { 

    @Override 
    public int compare(PageRankNode x, PageRankNode y) { 
     if(x.pageRank < y.pageRank) { 
     return -1; 
     } 
     if(x.pageRank > y.pageRank) { 
     return 1; 
     } 
     return 0; 
    } 
    } 
} 

输出是:

F 
C 
D 
A 
B 
E