2

故事是这样的。我有一个kafka经纪人和一个特定的对象(我通过我的主题进行jsonify发送),它有一个我想用作密钥的ID。Spring Cloud Streams没有在消息中设置kafka密钥?

当前我正在使用'partitionKeyExtractorClass'配置来设置提取ID并将其作为关键字返回的类。

它看起来像这样:

def extractKey(Message<?> message) { 
    log.info('Extracting key from message') 
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id 
    log.info("Got = ${id}") 

    return id 
} 

我实际的问题是,当我浏览这个专题的消息,这对我的消息说,关键是空的ConsumerRecord ...

这是一个错误?难道我做错了什么?关于这方面的文档并没有比这更进一步。

回答

1

看,你在混合partitionkey

目前KafkaMessageChannelBinder不提供选项来确定keyMessage

只有现有的功能,你可以用有力的KafkaHeaders.MESSAGE_KEY

Object messageKey = this.messageKeyExpression != null 
      ? this.messageKeyExpression.getValue(this.evaluationContext, message) 
      : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY); 

所以,在output消息,你应该计算密钥并将其放入这个头。

+1

这不是一个可接受的解决方案。我有同样的问题。我使用原始模式输出。我不想在邮件中添加任何标题,但需要设置关键字。我不想切换到Apache骆驼。 –

+0

您的关注尚不明确。这是当前Kafka Binder实现的一个限制,其中mesageKey只是一个省略。尽管如此,请随时填写该问题。您可以配置Kafka Binder现在不要映射该标头,如果您关心这一点。我绝对不明白Apache Camel如何取代Spring Cloud Stream ... –

+1

问题是,除了Spring Cloud Streams之外,我正在将流与其他生产者和消费者进行整合。消息密钥对于确保在kafka主题上的顺序非常重要。 Spring Cloud Streams控制标题意味着所有其他生产者和消费者必须处理Spring Cloud Streams特定标题。我需要kafka消息正是我想要的,而不是Spring Cloud Streams想要的。一个简单的键和字符串消息是我需要的。原始模式给了我,但没有维护秩序的关键。 –

相关问题