2015-02-23 78 views
0

我正在尝试Apache Storm处理GeoHash代码流。我正在使用this library和Apache Storm 0.9.3。 Python的geohash详细信息可在enter link description here找到。使用Apache Storm时的同步问题

目前,我面临一个BOLT类的执行方法中的同步问题。我试过使用一个粗体,这给了我正确的输出。但是当我从一根螺栓走到两条或更多的那一刻。输出结果搞砸了。

用于螺栓的一个代码段(这才是有问题)是:

public static int PRECISION=6; 
private OutputCollector collector; 
BufferedReader br; 
String lastGeoHash="NONE"; 
HashMap<String,Integer> map; 
HashMap<String,String[]> zcd; 
TreeMap<Integer,String> counts=new TreeMap<Integer,String>(); 
public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
{ 
    String line=""; 
    this.collector = collector; 
    map=new HashMap<String,Integer>(); 
    zcd=new HashMap<String,String[]>(); 
    try { 
     br = new BufferedReader(new FileReader("/tmp/zip_code_database.csv")); 
     int i=0; 
     while ((line = br.readLine()) != null) { 
      if(i==0){ 
       String columns[]=line.split(","); 
       for(int j=0;j<columns.length;j++){ 
        map.put(columns[j],j); 
       } 
      }else{ 
       String []split=line.split(","); 
       zcd.put(split[map.get("\"zip\"")],new String[]{split[map.get("\"state\"")],split[map.get("\"primary_city\"")]}); 
      } 
      i++; 
     } 
     br.close(); 
    // System.out.println(zcd); 
    } catch (FileNotFoundException e) { 
     e.printStackTrace(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("Initialize"); 
    initializeTreeMapAsPerOurRequirement(counts); 
} 

public void execute(Tuple tuple) 
{ 
    String completeFile = tuple.getStringByField("string");//So, this data is generated by Spout and it contains the complete shape file where each line is separated by a new line character i.e. "\n" 
    String lines[]=completeFile.split("\t"); 
    String geohash=lines[0]; 
    int count=Integer.parseInt(lines[1]); 
    String zip=lines[2]; 
    String best=""; 
    String city=""; 
    String state=""; 

    if(!(geohash.equals(lastGeoHash)) && !(lastGeoHash.equals("NONE"))){ 
     //if(counts.size()!=0){ 
      //System.out.println(counts.firstKey()); 
       best=counts.get(counts.lastKey()); 
       //System.out.println(geohash); 
       if(zcd.containsKey("\""+best+"\"")){ 
        city = zcd.get("\""+best+"\"")[0]; 
        state = zcd.get("\""+best+"\"")[1]; 
        System.out.println(lastGeoHash+","+best+","+state+","+city+","+"US"); 
       }else if(!best.equals("NONE")){ 
        System.out.println(lastGeoHash); 
        city="MISSING"; 
        state="MISSING"; 
       } 
     //  initializeTreeMapAsPerOurRequirement(counts); 
      //}else{ 
       //System.out.println("else"+geohash); 
      //} 

     //} 
    } 
    lastGeoHash=geohash; 
    counts.put(count, zip); 

    collector.ack(tuple); 
} 

private void initializeTreeMapAsPerOurRequirement(TreeMap<Integer,String> counts){ 
    counts.clear(); 
    counts.put(-1,"NONE"); 
} 

public void declareOutputFields(OutputFieldsDeclarer declarer) 
{ 
    System.out.println("here"); 
    declarer.declare(new Fields("number")); 
} 

拓扑代码:

public static void main(String[] args) 
{ 
    TopologyBuilder builder = new TopologyBuilder(); 

    builder.setSpout("spout", new SendWholeFileDataSpout(),2); 
    builder.setBolt("map", new GeoHashBolt(),2).shuffleGrouping("spout"); 
    builder.setBolt("reduce",new GeoHashReduceBolt(),2).fieldsGrouping("map", new Fields("value")); 

    Config conf = new Config(); 

    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("test", conf, builder.createTopology()); 
    Utils.sleep(10000); 
    cluster.killTopology("test"); 
    cluster.shutdown(); 
} 

有人可以看看代码,并引导我有点。

+0

你为螺栓设置了'parallelism_hint'?还请显示您的拓扑代码。 – Shams 2015-02-23 12:54:49

+0

@shizan添加了拓扑代码。考虑到它对我的申请没有好处,我没有为每个执行者添加任务数量。 – 2015-02-24 04:43:22

回答

0

您已将parallelism_hint设置为2,以确定您的喷嘴和两个螺栓。这意味着2个执行器将按每个组件运行,这可能会混淆输出。
通过将parallelism_hint设置为1,您可以实现所需的输出。