2017-06-14 33 views
1

我已阅读其他线程,并通过使用新的组ID识别问题,但是我想了解可能导致此问题的原因。消费者卡在重新加入

我有一个主题与16个分区,我已经设置session.timeout.ms = 30000和max.poll.interval.ms = 30000000。

我运行我的程序,并按ctrl + c它,所以它没有正确关闭。我猜,16次之后,我陷入了这个重新加入的问题。 session.timeout.ms是心跳超时,所以30秒后它应该踢我的消费者权利,我的分区应该“释放”权利?还是只听我的max.poll.interval.ms?

编辑:我仍然间歇地得到这个错误,当它发生时,我必须重新启动我的所有消费者。即使我的消费者运行良好,然后他们开始都陷入重新加入的状态(没有消费者被添加/删除),这种情况也会发生。下面是当我尝试当它停留在该国一个新的消费后连接到它的错误日志:

https://pastebin.com/AXJeSHkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org[email protected]b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null) 

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected. 
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected 

这些都是我认为相关的第一个和最后的消息。以下是我设定的相关超时时间:

session.timeout.ms=30000 
max.poll.interval.ms=43200000  
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms 
enable.auto.commit=false 

我应该设置heartbeat.interval.ms吗?这是心跳在一些后台线程中自动发送给经纪人的时间间隔(我阅读过文档,但出于某种原因,我无法将我的头围绕在它周围)?

回答

1

如果您的客户端没有正确断开连接(崩溃或SIGINT),它将需要session.timeout.ms(在您的情况下为30秒)以便服务器将其从组中踢出。在此期间,服务器仍然会认为消费者是该组的一部分,因此不会进行任何重新分配。一旦这个延迟结束,分配的分区将被重新分配给其他消费者(如果有的话)。

这当然不会发生,如果您使用新的组ID。尽管每次开发时都会使用新的组(这是因为您不必等待),但您会失去前一组的任何已提交的偏移量,这可能不代表您的应用在生产中运行时的状态。

关于max.poll.interval.ms,它是消费者逻辑中2次调用poll()之间允许的最大延迟时间。我不认为这个设置与这个问题有关。

+0

恩,谢谢澄清。如果我等待的时间超过30秒(几乎至少10分钟才能重新加入),你知道还有什么其他超时吗? – kyl

+0

在这种情况下,我建议你打开log4j中的调试或跟踪,并检查客户端在做什么 –

+0

嗨,我已经更新了我的原始问题与日志和更多信息,如果你有任何意见/见解,将非常赞赏 – kyl