0
时,我有一个非常简单的拓扑结构:斯特罗姆的ClassNotFoundException提交给远程集群
builder.setSpout("StreamSpout", new StreamSpout(), 1);
builder.setBolt("Bolt", new TestBolt(), 1).shuffleGrouping("StreamSpout");
StreamSpout:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.outputCollector = collector;
this.config = conf;
this.context = context;
}
public void nextTuple() {
this.outputCollector.emit(new Values("test"));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
TestBolt:
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getStringByField("word");
Logger.getLogger(this.getClass()).info(word);
}
而且我使用下面的代码提交:
conf.put(Config.NIMBUS_HOST, "W.X.Y.Z"); // Cluster's IP
conf.put(Config.NIMBUS_THRIFT_PORT,6627);
conf.put(Config.STORM_ZOOKEEPER_PORT,2181);
System.setProperty("storm.jar", "/home/.../{FileName}-SNAPSHOT.jar"); // Generated JAR file path
StormSubmitter submitter = new StormSubmitter();
submitter.submitTopology("de-linkS", conf, builder.createTopology());
但每当我尝试我的拓扑提交给我的群集我从工人日志文件中出现以下错误:
2015-04-06 21:36:03 b.s.d.worker [ERROR] Error on initialization of server mk-worker
java.lang.RuntimeException: java.lang.ClassNotFoundException: upa.pfe.storm.spouts.StreamSpout
at backtype.storm.utils.Utils.deserialize(Utils.java:95) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:235) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
...
然而一切提交拓扑当地cluser时完美的作品。
任何想法,我可能做错了什么?
谢谢