我有一个使用Kafka的基于Spring Cloud Stream的微服务。Spring Cloud stream partitionKeyExpression错误计算
我用4个分区创建了一个kafka主题。
我配置在我YML以下内容:partitionKey可变
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE
在我的代码有一个包含我的消息类(在其超类):
@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{
}
@Data
@ToString
public class GeneralOutputMessage {
private String operationType;
private List<String> affectedFields;
private Object data;
private String eventId;
private String eventName;
private String partitionKey;
}
我正在发送TransactionsResponse对象作为消息:
final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
log.info("Message Sent: {}", transactionResponse);
}
我的期望是春天的云流将采取关键payload.partitionKey,计算其hashCode()%4,并将事件发送到该分区。
但是,逻辑是完全随机的。下面是一些例子:
Math.abs( “111615631” .hashCode()%4)= 1。然而,在消息被发送给分区号3.
Math.abs( “110019882”。 hashCode()%4)= 2。但是,该消息被发送到分区号0.
Math.abs(“943152574”.hashCode()%4)= 0.此消息确实得到发送分区编号为0.
Math.abs(“943198862”.hashCode()%4)= 0.但是,此消息被发送到分区号r 2.
我正在使用Dalston.SR1发行版。
我在这里错过了什么?
谢谢。
更新:
只是试图用相同的partitionKey(但略有不同的消息体)发送相同的事件。即使分区密钥相同,该消息也会转到两个不同的分区。看起来Spring Cloud Stream完全忽略了partitionKeyExpression。