2017-05-19 25 views
1

我遇到了Kafka和Storm的问题。我不确定在这一点上,如果它是我设置的KafkaSpout配置的问题,或者如果我没有正确地确认或者什么。为什么Apache Storm KafkaSpout从Kafka主题中发出如此多的内容?

我在我的卡夫卡主题上排队了50个项目,但是我的喷口发出了超过1300个(和计数)的元组。此外,Spout报道几乎所有的都“失败”。拓扑实际上没有失败,它成功地写入到数据库,但我不知道为什么它显然是重播一切这么多(如果这就是它在做什么)

的一大问题是:

为什么当我只通过50到卡夫卡时,它发出了很多元组吗?

enter image description here

这里是我如何建立拓扑和KafkaSpout

public static void main(String[] args) { 
    try { 
     String databaseServerIP = ""; 
     String kafkaZookeepers = ""; 
     String kafkaTopicName = ""; 
     int numWorkers = 1; 
     int numAckers = 1; 
     int numSpouts = 1; 
     int numBolts = 1; 
     int messageTimeOut = 10; 
     String topologyName = ""; 

     if (args == null || args[0].isEmpty()) { 
     System.out.println("Args cannot be null or empty. Exiting"); 
     return; 
     } else { 
     if (args.length == 8) { 
      for (String arg : args) { 
      if (arg == null) { 
       System.out.println("Parameters cannot be null. Exiting"); 
       return; 
      } 
      } 
      databaseServerIP = args[0]; 
      kafkaZookeepers = args[1]; 
      kafkaTopicName = args[2]; 
      numWorkers = Integer.valueOf(args[3]); 
      numAckers = Integer.valueOf(args[4]); 
      numSpouts = Integer.valueOf(args[5]); 
      numBolts = Integer.valueOf(args[6]); 
      topologyName = args[7]; 
     } else { 
      System.out.println("Bad parameters: found " + args.length + ", required = 8"); 
      return; 
     } 
     } 

     Config conf = new Config(); 

     conf.setNumWorkers(numWorkers); 
     conf.setNumAckers(numAckers); 
     conf.setMessageTimeoutSecs(messageTimeOut); 

     conf.put("databaseServerIP", databaseServerIP); 
     conf.put("kafkaZookeepers", kafkaZookeepers); 
     conf.put("kafkaTopicName", kafkaTopicName); 

     /** 
     * Now would put kafkaSpout instance below instead of TemplateSpout() 
     */ 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts); 
     builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout"); 


     StormTopology topology = builder.createTopology(); 

     StormSubmitter.submitTopology(topologyName, conf, topology); 

    } catch (Exception e) { 
     System.out.println("There was a problem starting the topology. Check parameters."); 
     e.printStackTrace(); 
    } 
    } 

    private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception { 

    //String topic = "FLAT-ITEMS"; 
    String zkNode = "/" + topic + "-subscriber-pipeline"; 
    String zkSpoutId = topic + "subscriberpipeline"; 
    KafkaTopicInZkCreator.createTopic(topic, zkHosts); 


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId); 
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; 
    //spoutConfig.startOffsetTime = System.currentTimeMillis(); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    return new KafkaSpout(spoutConfig); 

    } 

,这里是话题的情况下创造的事项

public static void createTopic(String topicName, String zookeeperHosts) throws Exception { 
    ZkClient zkClient = null; 
    ZkUtils zkUtils = null; 
    try { 

     int sessionTimeOutInMs = 15 * 1000; // 15 secs 
     int connectionTimeOutInMs = 10 * 1000; // 10 secs 

     zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
     zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

     int noOfPartitions = 1; 
     int noOfReplication = 1; 
     Properties topicConfiguration = new Properties(); 

     boolean topicExists = AdminUtils.topicExists(zkUtils, topicName); 
     if (!topicExists) { 
     AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$); 
     } 
    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     if (zkClient != null) { 
     zkClient.close(); 
     } 
    } 
    } 

回答

1

你需要看看螺栓中的信息是否失败。

如果他们都失败了,你可能没有在螺栓上留言,或者在螺栓代码中有异常。

如果发出螺栓消息,更可能是超时。增加拓扑超时配置或paralisim应该可以解决问题。

+0

谢谢。确定螺栓的正确方法是什么?如何增加拓扑超时? – markg

+0

@markg如果您使用的是BaseBasicBolt,则无需处理该ack。它使用BaseRichBolt,你应该在execute方法中调用ack()。 – Solo

+0

@markg拓扑超时是“topology.message.timeout”配置,您可以在拓扑代码或主管的storm.yaml中设置它 – Solo

相关问题