2017-03-20 157 views
0

我正在尝试编写一个简单的java kafka消费者以使用与https://github.com/bkimminich/apache-kafka-book-examples/blob/master/src/test/kafka/consumer/SimpleHLConsumer.java中类似的代码读取数据。kafka java消费者未读取数据

看起来像我的应用程序能够连接,但它没有获取任何数据。请建议。

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
//import scala.util.parsing.json.JSONObject 
import scala.util.parsing.json.JSONObject; 

public class SimpleHLConsumer { 

    private final ConsumerConnector consumer; 
    private final String topic; 

    public SimpleHLConsumer(String zookeeper, String groupId, String topic) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeper); 
     props.put("group.id", groupId); 
     // props.put("zookeeper.session.timeout.ms", "5000"); 
     // props.put("zookeeper.sync.time.ms", "250"); 
     // props.put("auto.commit.interval.ms", "1000"); 

     consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 
     this.topic = topic; 
    } 

    public void testConsumer() { 
     Map<String, Integer> topicCount = new HashMap<>(); 
     topicCount.put(topic, 1); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); 
     System.out.println(consumerStreams); 
     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 
     System.out.println(streams); 
     System.out.println(consumer); 
     for (final KafkaStream stream : streams) { 
      ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
      System.out.println("for loop"); 
      System.out.println(it); 
      System.out.println("Message from Single Topic: " + new String(it.next().message())); 
      //System.out.println("Message from Single Topic: " + new String(it.message())); 
      while (it.hasNext()) { 
       System.out.println("in While"); 
       System.out.println("Message from Single Topic: " + new String(it.next().message())); 
      } 
     } 
     // if (consumer != null) { 
     //  consumer.shutdown(); 
     // } 
    } 

    public static void main(String[] args) { 
     String topic = "test"; 
     SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic); 
     simpleHLConsumer.testConsumer(); 
    } 

} 

这里是我在eclipse中看到的输出。它似乎连接到我的动物园管理员,但它只是挂在那里,根本不显示任何消息。

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
SLF4J: The requested version 1.6 by your slf4j binding is not compatible with [1.5.5, 1.5.6] 
SLF4J: See http://www.slf4j.org/codes.html#version_mismatch for further details. 
{test=[testgroup kafka stream]} 
[testgroup kafka stream] 
[email protected] 
for loop 
+0

在启动使用者程序后是否创建了任何新消息?如果没有,尝试添加'props.put(“auto.offset.reset”,“smallest”);'创建'ConsumerConfig'实例并重新运行该程序以查看是否可以看到所消费的消息。 – amethystic

回答

0

消费者迭代器hasNext正在阻止呼叫。如果没有新的消息可用于消费,它将无限期地阻止。

为了验证这一点,更改您的代码

// Comment 2 lines below 
// System.out.println(it); 
// System.out.println("Message from Single Topic: " + new String(it.next().message())); 
// Line below is blocking. Your code will hang till next message in topic. 
// Add new message in topic using producer, message will appear in console 
while (it.hasNext()) { 

更好的方法是在单独的线程中执行代码。使用consumer.timeout.ms指定以ms为单位的时间,之后消费者将抛出超时异常

// keepRunningThread is flag to control when to exit consumer loop 
while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Timeout exception waiting for kafka message 
    // Wait for 5 (or t) seconds before checking for message again 
    Thread.sleep(5000); 
    } 
}‍‍‍‍‍‍‍‍‍‍