0
我在本地主机上运行了一个kafka代理。 kafka自带的动物园管理员也在我的本地主机上运行。尝试运行风暴拓扑以使用来自kafka的邮件时拒绝连接
我已经创建了一个简单的拓扑结构,这将消耗来自卡夫卡代理的消息。但是,当我运行拓扑时,我得到连接拒绝异常。
下面是我的拓扑
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class KafkaTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";
private static final String KAFKA_BOLT_ID = "kafkaBolt";
public KafkaTopology(){
}
private SpoutConfig constructKafkaSpoutConf()
{
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "topic2";
String zkRoot = "/topic2";
String consumerGroupId = "StormSpout";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
return spoutConfig;
}
public void configureKafkaSpout(TopologyBuilder builder)
{
KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf());
builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout);
}
public void configureKafkaBolt(TopologyBuilder builder)
{
KafkaBolt kafkaBolt = new KafkaBolt();
builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).globalGrouping(KAFKA_SPOUT_ID);
}
private void buildAndSubmit() throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
configureKafkaSpout(builder);
configureKafkaBolt(builder);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology("kafka-processor",
conf, builder.createTopology());
}
public static void main(String[] str) throws Exception
{
KafkaTopology kafkaTopology = new KafkaTopology();
kafkaTopology.buildAndSubmit();
}
}
,我得到的是如下
Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:116)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
at udacity.storm.KafkaTopology.buildAndSubmit(KafkaTopology.java:66)
at udacity.storm.KafkaTopology.main(KafkaTopology.java:73)
Caused by: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
at backtype.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:83)
at backtype.storm.security.auth.ThriftClient.<init>(ThriftClient.java:63)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:47)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:43)
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36)
... 4 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.thrift7.transport.TSocket.open(TSocket.java:178)
... 10 more
http://stackoverflow.com/questions/22264893/storm-topology-failure-while-running-on-production –