2
故事是这样的。我有一个kafka经纪人和一个特定的对象(我通过我的主题进行jsonify发送),它有一个我想用作密钥的ID。Spring Cloud Streams没有在消息中设置kafka密钥?
当前我正在使用'partitionKeyExtractorClass'配置来设置提取ID并将其作为关键字返回的类。
它看起来像这样:
def extractKey(Message<?> message) {
log.info('Extracting key from message')
String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
log.info("Got = ${id}")
return id
}
我实际的问题是,当我浏览这个专题的消息,这对我的消息说,关键是空的ConsumerRecord ...
这是一个错误?难道我做错了什么?关于这方面的文档并没有比这更进一步。
这不是一个可接受的解决方案。我有同样的问题。我使用原始模式输出。我不想在邮件中添加任何标题,但需要设置关键字。我不想切换到Apache骆驼。 –
您的关注尚不明确。这是当前Kafka Binder实现的一个限制,其中mesageKey只是一个省略。尽管如此,请随时填写该问题。您可以配置Kafka Binder现在不要映射该标头,如果您关心这一点。我绝对不明白Apache Camel如何取代Spring Cloud Stream ... –
问题是,除了Spring Cloud Streams之外,我正在将流与其他生产者和消费者进行整合。消息密钥对于确保在kafka主题上的顺序非常重要。 Spring Cloud Streams控制标题意味着所有其他生产者和消费者必须处理Spring Cloud Streams特定标题。我需要kafka消息正是我想要的,而不是Spring Cloud Streams想要的。一个简单的键和字符串消息是我需要的。原始模式给了我,但没有维护秩序的关键。 –