2016-09-26 93 views
0

我正在使用春季卡夫卡1.0.3消费卡夫卡消息。 2在kafka中的主题和每个主题都有1个分区。 在java代码中,有2个@KafkaListener消费每个主题消息。 ConcurrentKafkaListenerContainerFactory的并发性设置为1.但消息有时会延迟20多秒。春季卡夫卡消息消费延迟

@Configuration 
@EnableKafka 
public class KafkaConfig { 

@Value("${kafka.brokers}") 
private String brokers; 

@Bean 
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> 
kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setConcurrency(1); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 
    return factory; 
} 


@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "server-group"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    return props; 
} 

@KafkaListenr代码如下:

@KafkaListener(id = "serverInChannel",topics = CommonContants.KAFKA_TOPIC.SERVER_IN_CHANNEL) 
public void consumeInMessage(@Payload String data, 
           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 
           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId, 
           Acknowledgment ack) { 
    logger.info("[serverInMessage]data=" + data); 
    ack.acknowledge(); 
} 

@KafkaListener(id = "webOutChannel",topics = CommonContants.KAFKA_TOPIC.WEB_OUT_CHANNEL) 
public void consumeOutMessage(@Payload String data, 
           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId, 
           Acknowledgment ack) { 
    logger.info("[webOutMessage]data=" + data); 
    ack.acknowledge(); 
} 

卡夫卡消息被弹簧卡夫卡发送和日志是在这里:

2016-09-26 16:16:42,777 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-26 16:16:44,101 [webOutChannel-0-kafka-listener-6] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:16:45,551 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-26 16:16:45,562 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-26 16:16:45,663 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0] 
2016-09-26 16:16:45,715 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-26 16:16:45,781 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-26 16:16:45,805 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:16:45,870 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:17:01,099 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-26 16:17:02,054 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-26 16:17:02,108 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-26 16:17:02,120 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0] 
2016-09-26 16:17:02,133 [serverInChannel-0-kafka-listener-5] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 

没有在日志中一些partitions assigned:[],我不知道为什么分配是空的。当webOutMessage消息在2016-09-26 16:16:45,870处消耗后,下一个动作是在2016-09-26 16:17:01,099的15+秒之后的消费者重新平衡,因此下一个消息在15+秒内延迟。

有人知道为什么吗?

添加调试日志,并延迟不是每次,有时是确定:

2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=179, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 179, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:33:58,623 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker 
2016-09-27 15:33:58,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0] 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {} 
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {} 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:14,599 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-27 15:34:14,599 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-27 15:34:15,276 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=174, metadata=''}} 
2016-09-27 15:34:15,286 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:34:15,516 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0] 
2016-09-27 15:34:15,516 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 3 records 
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=174, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 174, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=175, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 175, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=176, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 176, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}} 
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}} 
2016-09-27 15:34:15,795 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Woken up during commit 
2016-09-27 15:34:15,796 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 2 records 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=180, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 180, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=181, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 181, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}} 
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}} 
+0

为o.s.kafka和o.a.k.clients启用DEBUG日志记录以查看它是否提供更多信息。 –

+0

在问题结尾添加一些调试日志。 – tkec

回答

0

容器被正确反应,由经纪人发起的一个分区重新平衡。

您必须检查服务器日志以确定重新平衡的原因。