2017-09-01 36 views
1

我正在使用spring-kafka版本1.1.3来使用来自主题的消息。自动提交在消费者配置中设置为truemax.poll.records10。与服务器协商为012秒10秒。会话超时时卡夫卡监听器回滚事务

收到消息后,我将其中的一部分保存到数据库中。我的数据库往往是相当缓慢的,有时,它由卡夫卡听众导致会话超时:

自动偏移提交失败组MYGROUP:承诺无法完成 由于本集团已重新调整和分配的分区 另一位会员。这意味着后续调用 poll()之间的时间长于配置的session.timeout.ms,其中 通常意味着轮询循环花费太多时间处理消息 。您可以通过增加会话 超时或通过减少使用max.poll.records在poll() 中返回的批处理的最大大小来解决此问题。

既然不能增加服务器的会话超时和max.poll.records已经下跌10,我希望能够在包裹交易我的数据库调用,这将在的情况下回滚卡夫卡会话超时。

这是可能的,我该如何做到这一点?

不幸的是,我无法在文档中找到解决方案。

+0

为什么回滚数据库提交?如果您需要将消费者设置为手动提交,然后捕获异常。 – PragmaticProgrammer

+0

什么例外?以上信息仅被记录。据我所知,当卡夫卡会议超时时不会引发异常。 – meva

回答

0

决定升级到Kafka 0.11,因为它增加了事务支持(请参阅Release Notes)。

2

你必须考虑升级到春季卡夫卡1.2和卡夫卡0.10.x。老阿帕奇卡夫卡有一个心跳的缺陷。因此,如果使用autoCommit和慢听者,最终会出现意想不到的重新平衡,并且您将面临这样的问题。你使用Spring卡夫卡的版本有类似的逻辑:

// if the container is set to auto-commit, then execute in the 
// same thread 
// otherwise send to the buffering queue 
if (this.autoCommit) { 
    invokeListener(records); 
} 
else { 
    if (sendToListener(records)) { 
     if (this.assignedPartitions != null) { 
      // avoid group management rebalance due to a slow 
      // consumer 
      this.consumer.pause(this.assignedPartitions); 
      this.paused = true; 
      this.unsent = records; 
     } 
    } 
} 

那么,你可以考虑关掉autoCommit并依靠内置pause功能,在默认情况下开启。

+0

谢谢!我会尝试,并接受答案,如果它的工作。 – meva

+0

@meva,它适合你吗?接受答案让别人知道这是一个解决方案,这是非常好的方式 –