我试图用Kafka-Client库(0.9.0.1)测试Producer,Consumer。 代理(0.9.0.1)在服务器上运行,我已经测试KafkaProducer,没有问题。 但是当我测试KafkaConsumer进行轮询时,代理会发出错误消息。KafkaConsumer在KafkaServer(版本0.9.0.1)上出错
[2016年3月18日13:44:19129]错误关闭插座/172.26.132.149因为错误(kafka.network.Processor) kafka.common.KafkaException的:在错误卡夫卡请求类型10 .api.RequestKeys $ .deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel $ Request。(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka .network.Processor.run(SocketServer.scala:245)
消费者测试代码如下。
class ConsumerRunner implements Runnable{
private KafkaConsumer<String,String> consumer;
private String topic;
public ConsumerRunner(String topic,Properties props){
consumer = new KafkaConsumer<String,String>(props);
this.topic = topic;
consumer.subscribe(Arrays.asList(this.topic));
}
public void run() {
while(true){
ConsumerRecords<String,String> records = consumer.poll(10000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}
我猜到轮询请求包含错误请求类型键,但是当我检查卡夫卡芯源,以及我relized请求类型键“10”被定义为“GroupCoordinatorKey”。我发现可疑代码在 'kafka.network.RequestChannel.scala'
val requestObj =
if (RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
RequestKeys.deserializerForKey(requestId)(buffer)
else
null
测试消费者还显示错误消息
java.io.EOFException的:空在 org.apache。 kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 在 org.apache.kafka.common。 network.KafkaChannel.receive(卡夫kaChannel.java:153) 在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) 在org.apache.kafka.common.network.Selector.poll(Selector.java:286 ) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)(ConsumerNetworkClient.java:213) 在 org.apache.kafk a.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180) 在 org.apache。 kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 在 com.medialog.mdt.kafka。 KafkaTest $ ConsumerThread.run(KafkaTest.java:61)
没有任何人有一个想法?这是我的问题吗?或其他人? 请帮帮我。 谢谢。
我已经使用新的客户API,因为您可以看到测试代码... –