2017-10-18 156 views
0

我正在使用Storm版本1.1.0和卡夫卡版本0.10.1.2。卡夫卡喷口错误“消费者没有订阅任何主题或分配任何分区”

我创造卡夫卡壶嘴如下:

public KafkaSpout<String, String> getKafkaSpout() { 
    String _kafkaBrokers = (String) props.get("bootstrap.servers"); 
    String _topic = (String) props.get("kafka.topic.name"); 
    String groupId = (String) props.get("group.id"); 
    int maxMsgSize = (int) props.get("fetch.message.max.bytes"); 
    String keySerializer = (String) props.get("key.serializer"); 
    String valueSerializer = (String) props.get("value.serializer"); 

    List<String>topics = new ArrayList<String>(`enter code here`); 
    topics.add(_topic); 

    return new KafkaSpout<String, String (KafkaSpoutConfig.builder(_kafkaBrokers, topics) 
      .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) 
      .setMaxUncommittedOffsets(100) 
      .setProp(ConsumerConfig.GROUP_ID_CONFIG, groupId) 
      .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxMsgSize) 
      .setProp("key.serializer",keySerializer) 
      .setProp("value.serializer",valueSerializer) 
      .build()) 
} 

我与我的项目已经下文提到的Maven依赖其他依赖相处下文提到的错误

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:973) 
at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:291) 
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:225) 
at org.apache.storm.daemon.executor$fn__9798$fn__9813$fn__9844.invoke(executor.clj:647) 
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) 
at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka-client</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 

回答

0

我会假设List<String>topics = new ArrayList<String>("enter code here");是你的问题?您可能需要在该列表中输入您的主题名称。

您的依赖版本很奇怪,AFAIK Storm还没有发布任何与这些版本字符串。

我还想知道为什么你需要两个卡夫卡风暴卡夫卡客户端,这是卡夫卡> 0.10群集和风暴卡夫卡,这是为老卡夫卡群集(但仍然兼容最新的卡夫卡在此刻我认为)。

相关问题