1

我正在建设一个春季卡夫卡消费者。我已经设置了重试机制。重试完毕后,我想将失败的消息推送给死信主题。Kafka Consumer - 收听方收到的参数恢复方法

Listen方法具有以下参数

public void listen(@Payload Map<String, Object> conciseMap, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     Acknowledgment ack) throws JsonProcessingException { 

作为恢复方法的一部分,我想取conciseMap输入到听者的地图或者是由我的主题接收到的原始信息传递。有没有办法做到这一点?

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(conncurrency); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRetryTemplate(retryTemplate()); 
    factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge(); 

      return null; 
     } 
    }); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

回答

1

你不能得到,在RecoveryCallbackRetryContext转换conciseMap,但你可以检索ConsumerRecord是从主题原始转换前:

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD) 
+0

由于阿尔乔姆。 ConsumerRecord.value是否提供我们在侦听器方法中获得的字节? –

+0

M-m-m。可能。这是提供'Deserializer'的结果 –