2016-03-18 13 views
1

我试图用Kafka-Client库(0.9.0.1)测试Producer,Consumer。 代理(0.9.0.1)在服务器上运行,我已经测试KafkaProducer,没有问题。 但是当我测试KafkaConsumer进行轮询时,代理会发出错误消息。KafkaConsumer在KafkaServer(版本0.9.0.1)上出错

[2016年3月18日13:44:19129]错误关闭插座/172.26.132.149因为错误(kafka.network.Processor) kafka.common.KafkaException的:在错误卡夫卡请求类型10 .api.RequestKeys $ .deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel $ Request。(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka .network.Processor.run(SocketServer.scala:245)

消费者测试代码如下。

class ConsumerRunner implements Runnable{ 
    private KafkaConsumer<String,String> consumer; 
    private String topic; 
    public ConsumerRunner(String topic,Properties props){ 
     consumer = new KafkaConsumer<String,String>(props); 
     this.topic = topic; 
     consumer.subscribe(Arrays.asList(this.topic)); 
    } 
    public void run() { 
     while(true){ 
      ConsumerRecords<String,String> records = consumer.poll(10000); 
      for (ConsumerRecord<String, String> record : records) 
       System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     } 
    } 

} 

我猜到轮询请求包含错误请求类型键,但是当我检查卡夫卡芯源,以及我relized请求类型键“10”被定义为“GroupCoordinatorKey”。我发现可疑代码在 'kafka.network.RequestChannel.scala'

val requestObj = 
     if (RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) 
     RequestKeys.deserializerForKey(requestId)(buffer) 

     else 
     null 

测试消费者还显示错误消息

java.io.EOFException的:空在 org.apache。 kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 在 org.apache.kafka.common。 network.KafkaChannel.receive(卡夫kaChannel.java:153) 在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) 在org.apache.kafka.common.network.Selector.poll(Selector.java:286 ) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)(ConsumerNetworkClient.java:213) 在 org.apache.kafk a.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180) 在 org.apache。 kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 在 com.medialog.mdt.kafka。 KafkaTest $ ConsumerThread.run(KafkaTest.java:61)

没有任何人有一个想法?这是我的问题吗?或其他人? 请帮帮我。 谢谢。

回答

0

Minkoo

不知道如果你只是想只是创造0.9卡夫卡代码或您的卡夫卡消息的消费者有具体的东西,是造成这个问题,你可以分享更多的细节。

但是,如果您只是想为0.9编写一个kafka消费者,那么在Kafka 0.9中就有新的消费者API。如果您打开使用新的客户API,请查看此示例的示例https://github.com/sdpatil/KafkaAPIClient/blob/master/src/main/java/com/spnotes/kafka/simple/Consumer.java

Sunil

+0

我已经使用新的客户API,因为您可以看到测试代码... –