2015-10-19 12 views
0

我在本地主机上运行了一个kafka代理。 kafka自带的动物园管理员也在我的本地主机上运行。尝试运行风暴拓扑以使用来自kafka的邮件时拒绝连接

我已经创建了一个简单的拓扑结构,这将消耗来自卡夫卡代理的消息。但是,当我运行拓扑时,我得到连接拒绝异常。

下面是我的拓扑

import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.ZkHosts; 
import backtype.storm.Config; 
import backtype.storm.StormSubmitter; 
import backtype.storm.topology.TopologyBuilder; 

public class KafkaTopology 
{ 
    private static final String KAFKA_SPOUT_ID = "kafkaSpout"; 
    private static final String KAFKA_BOLT_ID = "kafkaBolt"; 

    public KafkaTopology(){ 

    } 

    private SpoutConfig constructKafkaSpoutConf() 
    { 
     BrokerHosts hosts = new ZkHosts("localhost:2181"); 
     String topic = "topic2"; 
     String zkRoot = "/topic2"; 
     String consumerGroupId = "StormSpout"; 

     SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId); 

     return spoutConfig; 
    } 



    public void configureKafkaSpout(TopologyBuilder builder) 
    { 
     KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf()); 
     builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout); 
    } 

    public void configureKafkaBolt(TopologyBuilder builder) 
    { 
     KafkaBolt kafkaBolt = new KafkaBolt(); 
     builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).globalGrouping(KAFKA_SPOUT_ID); 
    } 

    private void buildAndSubmit() throws Exception 
    { 
     TopologyBuilder builder = new TopologyBuilder(); 
     configureKafkaSpout(builder); 
     configureKafkaBolt(builder); 

     Config conf = new Config(); 
     conf.setDebug(true); 

     StormSubmitter.submitTopology("kafka-processor", 
            conf, builder.createTopology()); 
    } 

    public static void main(String[] str) throws Exception 
    { 
     KafkaTopology kafkaTopology = new KafkaTopology(); 
     kafkaTopology.buildAndSubmit(); 
    } 

} 

,我得到的是如下

Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused 
     at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38) 
     at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:116) 
     at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70) 
     at udacity.storm.KafkaTopology.buildAndSubmit(KafkaTopology.java:66) 
     at udacity.storm.KafkaTopology.main(KafkaTopology.java:73) 
    Caused by: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused 
     at org.apache.thrift7.transport.TSocket.open(TSocket.java:183) 
     at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81) 
     at backtype.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:83) 
     at backtype.storm.security.auth.ThriftClient.<init>(ThriftClient.java:63) 
     at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:47) 
     at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:43) 
     at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36) 
     ... 4 more 
    Caused by: java.net.ConnectException: Connection refused 
     at java.net.PlainSocketImpl.socketConnect(Native Method) 
     at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
     at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
     at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
     at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
     at java.net.Socket.connect(Socket.java:589) 
     at org.apache.thrift7.transport.TSocket.open(TSocket.java:178) 
     ... 10 more 
+0

http://stackoverflow.com/questions/22264893/storm-topology-failure-while-running-on-production –

回答

0

的问题是,试图连接到一个活的集群,但在运行它的本地异常的代码簇。使用此解决方案

LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("kafka-processor", 
           conf, builder.createTopology());