2016-03-03 42 views
2

我是usign storm 0.10和kafka 0.9.0.0与storm-kafka。每当我在群集中运行我的拓扑它开始从开始读书,虽然我给zkRoot和消费者的groupId从属性文件 -Storm-Kafka在zookeeper集群中未创建节点。

kafka.zkHosts=myserver.myhost.com:2181 
kafka.topic=onboarding-mail-topic 
kafka.zkRoot=/kafka-storm 
kafka.group.id=onboarding 

脱粒机:

BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts")); 
        String topicName = prop.getProperty("kafka.topic"); 
        String zkRoot = prop.getProperty("kafka.zkRoot"); 
        String groupId = prop.getProperty("kafka.group.id"); 

        //kafka spout conf 
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId); 

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 

当我检查饲养员ls /它不” t告诉我kafka-storm

[controller_epoch, controller, brokers, storm, zookeeper, kafka-manager, admin, isr_change_notification, consumers, config] 

回答

0

最后,我想通了。由于从卡夫卡中读取并将偏移量写回卡夫卡的方式受到了不同的控制。

如果在暴风雨群集上运行拓扑不论单个或多个节点的请确保您已设置以下在storm.yaml文件

storm.zookeeper.servers 

storm.zookeeper.port 

性能除了zkHosts和zkRoot和消费者组ID。

或最佳的做法是通过在像创建KafkaSpout设置正确的值来覆盖拓扑中的这些属性 -

 BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts")); 
     String topicName = prop.getProperty("kafka.topic"); 
     String zkRoot = prop.getProperty("kafka.zkRoot"); 
     String groupId = prop.getProperty("kafka.group.id"); 
     String kafkaServers = prop.getProperty("kafka.zkServers"); 
     String zkPort = prop.getProperty("kafka.zkPort"); 
     //kafka spout conf 
     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId); 

     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

     kafkaConfig.zkServers = Arrays.asList(kafkaServers); 
     kafkaConfig.zkPort = Integer.valueOf(zkPort); 

     KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 

甚至你可以把配置对象,这些价值。这是更好,因为你可能想弥补信息存储到其他一些动物园管理员集群VS拓扑从一个完全不同的经纪人读取消息understanding-

@Override 
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { 
    _collector = collector; 

    Map stateConf = new HashMap(conf); 
    List<String> zkServers = _spoutConfig.zkServers; 
    if (zkServers == null) { 
     zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); 
    } 
    Integer zkPort = _spoutConfig.zkPort; 
    if (zkPort == null) { 
     zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); 
    } 
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); 
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); 
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); 
    _state = new ZkState(stateConf); 

    _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); 

    // using TransactionalState like this is a hack 
    int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); 
    if (_spoutConfig.hosts instanceof StaticHosts) { 
     _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); 
    } else { 
     _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); 
    } 

KafkaSpout代码段