2014-01-21 144 views
4

虽然我的第一次风暴拓扑的工作是基于阅读Apache的文件并进行处理,提交拓扑当我得到奇怪的错误:无效的拓扑异常错误提交拓扑

6044 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='apachelog') #<InvalidTopologyException InvalidTopologyException(msg:Component: [lineBolt] subscribes from non-existent component [line])> 
6051 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died 
backtype.storm.generated.InvalidTopologyException: null 
    at backtype.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:151) ~[storm-core-0.9.0.1.jar:na] 
    at backtype.storm.daemon.common$system_topology_BANG_.invoke(common.clj:287) ~[storm-core-0.9.0.1.jar:na] 
    at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopologyWithOpts(nimbus.clj:932) ~[storm-core-0.9.0.1.jar:na] 
    at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto__$reify__5541.submitTopology(nimbus.clj:950) ~[storm-core-0.9.0.1.jar:na] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65] 
    at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65] 
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.4.0.jar:na] 
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.4.0.jar:na] 
    at backtype.storm.testing$submit_local_topology.invoke(testing.clj:236) ~[storm-core-0.9.0.1.jar:na] 
    at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19) ~[storm-core-0.9.0.1.jar:na] 
    at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.0.1.jar:na] 
    at storm.starter.ApacheAccessLogTopology.main(ApacheAccessLogTopology.java:68) ~[classes/:na] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_65] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) ~[na:1.6.0_65] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~[na:1.6.0_65] 
    at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_65] 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) ~[idea_rt.jar:na] 

我的萌芽文件

public class LogReaderSpout extends BaseRichSpout 
{ 
    private SpoutOutputCollector _collector; 
    private String filePath = "access_log"; 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
    { 
     _collector = collector; 
    } 

    @Override 
    public void nextTuple() 
    { 
     Utils.sleep(100); 
     String line = null; 
     try 
     { 
      BufferedReader bufferReader = new BufferedReader(new FileReader(filePath)); 
      while((line = bufferReader.readLine()) != null) 
      { 
       if(line!=null) 
       { 
        System.out.println(line); 
        _collector.emit(new Values(line)); 
       } 
      } 

     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
     } 
     System.out.println("Emitting Next Tuple.."); 
    } 

    @Override 
    public void ack(Object id) 
    { 
     System.out.println("Ack with ID: "+id); 
    } 

    @Override 
    public void fail(Object id) 
    { 
     System.out.println("Fail with ID: "+id); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("line")); 
    } 
} 

拓扑

package storm.starter; 
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 
import backtype.storm.utils.Utils; 
import storm.starter.spout.LogReaderSpout; 

import java.util.Map; 

public class ApacheAccessLogTopology 
{ 
    public static class LineBolt extends BaseRichBolt 
    { 
     OutputCollector _collector; 

     @Override 
     public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
     { 
      _collector = collector; 
     } 

     @Override 
     public void execute(Tuple tuple) 
     { 
      //ALL PROCESSING will take place here on tuple(in our case Line here) 
      _collector.emit(tuple, new Values(tuple.getString(0) + "???")); 
      _collector.ack(tuple); 
     } 

     @Override 
     public void declareOutputFields(OutputFieldsDeclarer declarer) 
     { 
      declarer.declare(new Fields("line")); 
     } 


    } 

    public static void main(String[] args) throws Exception 
    { 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("lineSpout", new LogReaderSpout(),2); 
     builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("line"); 

     Config conf = new Config(); 
     conf.setDebug(true); 

     if (args != null && args.length > 0) { 
      conf.setNumWorkers(2); 

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
     } 
     else { 

      LocalCluster cluster = new LocalCluster(); 
      cluster.submitTopology("apachelog", conf, builder.createTopology()); 
      Utils.sleep(10000); 
      cluster.killTopology("apachelog"); 
      cluster.shutdown(); 
     } 
    } 

} 
+0

对于任何遇到无效拓扑异常的人。如果这是你的拓扑结构,并且所有的名字都是正确的:'Spout1 - emit - > Bolt1 ----> Bolt2'。如果你的Bolt1没有发射任何元组,你会得到异常。 – Nav

回答

9

您的拓扑定义无效。你应该有:

builder.setBolt("lineBolt", new LineBolt(),2).shuffleGrouping("lineSpout"); 
+0

你说得对。你能解释为什么这有效吗?据我了解到,Shuffle意味着根据参数中提到的输入对数据进行分组。这里讲的是什么? – Volatil3

+0

@ Volatil3你需要告诉螺栓它的数据源。否则,它应该如何知道要处理什么?在你的情况下,螺栓无法获取其数据,因为没有名为'line'的喷口 – Chiron

+0

啊..所以* lineSpout *是引用正在消耗的源的别名。谢谢 – Volatil3

2

面对同样的问题,如上面和下面是我的例外

Topology submission exception. (topology name='Getting-Started-Toplogie') <InvalidTopologyException InvalidTopologyException(msg:Component: [Record-normalizertt] subscribes from non-existent stream: [default] of component [Rreader])> 

这里是我的解决方案这个工作给我。

在喷嘴或螺栓类中的方法declareOutputFields(OutputFieldsDeclarer声明器)中声明以下语句。

declarer.declare(new Fields("line")); 

最后你的方法应该如下在你的喷口或螺栓类。

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 
    declarer.declare(new Fields("line")); 

    }