我们正在升级我们的卡夫卡实现为.9和使用新的消费者java api来创建consumer.I使用下面的代码为消费者,我们正在使用设置主题给消费者在LINE A LINE B是对我们的服务的调用,它处理我们收到的消息。现在的问题是,如果我们的消息处理需要30秒以上,我们会收到异常。kafka升级到0.9与新的消费者API
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
例外是
2016年3月3日10:47:35.095 INFO 6448 --- [问调度-3] o.a.k.c.c.internals.AbstractCoordinator:标记协调2147483647死。 2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator:错误ILLEGAL_GENERATION在为组TEST-GROUP提交偏移时发生错误 CommitFailedException org.apache.kafka.clients。 consumer.CommitFailedException:提交无法完成,因为org.apache.kafka.clients.consumer.internals.ConsumerCoordinator上的组重新平衡 $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) at org.apache.kafka.clients.consumer。 incenals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients。 consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals。 RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient。 java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetwo rkClient.java:213)
上面的异常是在提交偏移时发生的。 任何建议将有助于谢谢
感谢您的回复返回给客户端,我尝试添加“request.timeout.ms”到70000,它给我的即使消息处理花了30000也是一样。 –
不知道为什么它没有起作用。我甚至尝试将“session.timeout.ms”设置为70000,但它给了我例外说“org.apache.kafka.common.errors.ApiException:会话超时不在可接受的范围内。”我在想你的第二个建议,但是如果在处理线程时出现一些问题,我会如何处理它?将它作为新消息再次添加到主题并在异常之前还原由消息引起的更改? –
增加group.max.session.timeout.ms以及 – Nautilus