2015-04-14 12 views
1

我试图提交一个字数拓扑到我的风暴群集。我使用Eclipse创建了一个jar,但它显示异常。提交一个字数拓扑到我的风暴群集,使用Eclipse创建的jar,但它显示异常

有人可以告诉我该怎么做。我在这里附上我的代码和例外。

喷Creation-

public class WordReader implements IRichSpout { 
    private SpoutOutputCollector collecter; 
    private BufferedReader bufferedreader; 
    private FileReader filereader; 
    private Boolean completed=false; 
    private TopologyContext context; 
    private final static Logger logger=LoggerFactory.getLogger(WordReader.class); 
    @Override 
    public void ack(Object msgId) { 
     // TODO Auto-generated method stub 
     System.out.println("Ok"+msgId); 
    } 
    @Override 
    public void activate() { 
     // TODO Auto-generated method stub 
     logger.info("Activating Storm");   
    } 
    @Override 
    public void fail(Object msgId) { 
     // TODO Auto-generated method stub 
     System.out.println("Fail"+msgId); 
    } 
    @Override 
    public void nextTuple() { 
     // TODO Auto-generated method stub 
     if(completed) 
     { 
      try 
      { 
       Thread.sleep(1000); 
      } 
      catch(InterruptedException e) 
      { 
       System.out.println("String is Interrupted"); 
      } 
     } 
     String line; 
     bufferedreader=new BufferedReader(filereader); 
     try { 
      while((line=bufferedreader.readLine())!= null) 
      { 
       this.collecter.emit(new Values(line)); 
      } 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     finally 
     { 
      completed=true; 
     } 
    } 
    @Override 
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { 
     // TODO Auto-generated method stub 
     this.context=context; 
     try { 
      this.filereader=new FileReader(map.get("InputFile").toString()); 
     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      throw new RuntimeException("Error reading file"); 
     } 
      this.collecter=collector;} 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 
     declarer.declare(new Fields("line")); 
    } 
} 

螺栓代码 -

public class WordNormalizer implements IRichBolt{ 
    private OutputCollector collecter; 
    @Override 
    public void cleanup() { 
     // TODO Auto-generated method stub 
    } 
    @Override 
    public void execute(Tuple input) { 
     // TODO Auto-generated method stub 
     String sentence=input.getStringByField("line"); 
     String[] words=sentence.split(" "); 
     for(String word:words) 
     { 
      word=word.trim(); 
      if(!word.isEmpty()) 
      { 
       word=word.toLowerCase(); 
       ArrayList a=new ArrayList(); 
       a.add(input); 
       this.collecter.emit(a,new Values(word)); 
      } 
      collecter.ack(input); 
     } 
    } 
    @Override 
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
      this.collecter=collecter; 
    } 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 
     declarer.declare(new Fields("word")); 
    } 
} 

螺栓计数频率 -

public class WordCountBolt implements IRichBolt { 
    private OutputCollector collector; 
    private Integer id; 
    private String name; 
    private Map<String, Integer> counter; 
    @Override 
    public void cleanup() { 
     // TODO Auto-generated method stub 
     for(Map.Entry<String, Integer> entry : counter.entrySet()) 
     { 
      System.out.println(entry.getKey()+" "+entry.getValue()); 
     } 

    } 
    @Override 
    public void execute(Tuple input) { 
     // TODO Auto-generated method stub 
     String str=input.getStringByField("word"); 
     if(!counter.containsKey(str)) 
     { 
      counter.put(str, 1); 
     } 
     else 
     { 
      Integer i=counter.get(str)+1; 
      counter.put(str, i); 
     } 
    } 
    @Override 
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
     // TODO Auto-generated method stub 
     this.counter=new HashMap<String, Integer>(); 
     this.collector=collector; 
     this.name=context.getThisComponentId(); 
     this.id=context.getThisTaskId(); 
    } 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
} 

主类创建拓扑 -

public class StormMain { 
    public static void main(String[] args) 
    { 
     //Configuration 
     Config conf = new Config(); 
     conf.put("InputFile",args[0]); 
     conf.setDebug(false); 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("word-reader",new WordReader()); 
     builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); 
     builder.setBolt("word-counter", new WordCountBolt()).shuffleGrouping("word-normalizer"); 
     //Topology run 
     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1); 
     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("TopologyMain",conf,builder.createTopology()); 
     //Thread.sleep(1000); 
     //cluster.killTopology("TopologyMain"); 
     cluster.shutdown(); 
    } 
} 

编辑例外

这是我得到的异常:

 org.apache.storm.zookeeper.server.NIOServerCnxn - caught end of stream exception 
org.apache.storm.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x14cb812ae720003, likely client has closed socket 
    at org.apache.storm.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) ~[storm-core-0.9.3.jar:0.9.3] 
    at org.apache.storm.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-0.9.3.jar:0.9.3] 
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71] 
6056 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55127 which had sessionid 0x14cb812ae720003 
6057 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720005 
6076 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720005 closed 
6076 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55133 which had sessionid 0x14cb812ae720005 
6076 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 
6076 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 3b4d74c2-9fa3-4b8d-beb8-419063c95c02 
6077 [Thread-3] INFO backtype.storm.event - Event manager interrupted 
6077 [Thread-4] INFO backtype.storm.event - Event manager interrupted 
6078 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720007 
6097 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55139 which had sessionid 0x14cb812ae720007 
6097 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720007 closed 
6097 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 
6098 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor e94ee8a8-f38f-4ba4-a48f-4427a7c8d30d 
6098 [Thread-5] INFO backtype.storm.event - Event manager interrupted 
6098 [Thread-6] INFO backtype.storm.event - Event manager interrupted 
6099 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720009 
6117 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720009 closed 
6117 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55145 which had sessionid 0x14cb812ae720009 
6118 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 
6118 [main] INFO backtype.storm.testing - Shutting down in process zookeeper 
6118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method 
6119 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down 
6119 [main] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down 
6119 [main] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down 
6119 [main] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down 
6119 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop! 
6119 [SyncThread:0] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited! 
6119 [main] INFO org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete 
6120 [main] INFO backtype.storm.testing - Done shutting down in process zookeeper 
6120 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8335008e-119b-4ae3-a557-2839d573a579 
6128 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989 
6130 [main] INFO backtype.storm.testing - Unable to delete file: C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989\version-2\log.1 
6130 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\1ea20791-599d-483d-9ffd-37445005684c 
6136 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\ef85048c-77e3-4392-8fc1-41bb4547ab53 
8027 [SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop! 
+0

似乎与Storm或Zookeeper服务器的连接已关闭。请确认。 – proutray

+0

我已经检查了风暴和动物园管理员的连接,以及他们正在运行 –

+0

你能否请尝试删除'cluster.shutdown();'部分 – user2720864

回答

-1

而不是使用LocalCluster的,我们必须使用StormSubmitter为提交拓扑.....我用它,它的工作原理,现在我有我的拓扑风暴的UI

cluster.submitTopology("TopologyMain",conf,builder.createTopology()); 

更换它with--

StormSubmitter.submitTopology("TopologyMain",conf,builder.createTopology()); 
0

只是增加的时间长度,该线程休眠 例如

Thread.sleep(10000); 

设为10000,然后检查。 这是因为您的地图任务花费了很多时间。通过移除

cluster.shutdown();  

,因为它是不允许的动物园管理员与风暴沟通

[SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop! 

解决它:

1

有一个非常类似的问题,即在运行程序就卡住了的。