2017-07-04 93 views
0

我有一个使用Kafka的基于Spring Cloud Stream的微服务。Spring Cloud stream partitionKeyExpression错误计算

我用4个分区创建了一个kafka主题。

我配置在我YML以下内容:partitionKey可变

spring: 
    cloud: 
    stream: 
     bindings: 
     SYNC_TABLE: 
      content-type: application/json 
      partitionKeyExpression: payload.partitionKey 
      partitionCount: 4 
      destination: ${envTopicPrefix}.LEGACY_TABLE 

在我的代码有一个包含我的消息类(在其超类):

@Data 
@EqualsAndHashCode(callSuper=true) 
@ToString(callSuper=true) 
public class TransactionResponse extends GeneralOutputMessage{ 

} 

@Data 
@ToString 
public class GeneralOutputMessage { 

    private String operationType; 
    private List<String> affectedFields; 
    private Object data; 
    private String eventId; 
    private String eventName; 
    private String partitionKey; 
} 

我正在发送TransactionsResponse对象作为消息:

final TransactionResponse transactionResponse = handler.handleEvent(event); 
if (transactionResponse != null) { 
    outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build()); 
    log.info("Message Sent: {}", transactionResponse); 
} 

我的期望是春天的云流将采取关键payload.partitionKey,计算其hashCode()%4,并将事件发送到该分区。

但是,逻辑是完全随机的。下面是一些例子:

Math.abs( “111615631” .hashCode()%4)= 1。然而,在消息被发送给分区号3.

Math.abs( “110019882”。 hashCode()%4)= 2。但是,该消息被发送到分区号0.

Math.abs(“943152574”.hashCode()%4)= 0.此消息确实得到发送分区编号为0.

Math.abs(“943198862”.hashCode()%4)= 0.但是,消息被发送到分区号r 2.

我正在使用Dalston.SR1发行版。

我在这里错过了什么?

谢谢。

更新:

只是试图用相同的partitionKey(但略有不同的消息体)发送相同的事件。即使分区密钥相同,该消息也会转到两个不同的分区。看起来Spring Cloud Stream完全忽略了partitionKeyExpression。

回答

0

是我的错,我忘了补充生产者:在阳明部分:

spring: 
    cloud: 
    stream: 
     bindings: 
     SYNC_TABLE: 
      content-type: application/json 
      producer: 
      partitionKeyExpression: payload.partitionKey 
      partitionCount: 4 
      destination: ${envTopicPrefix}.LEGACY_TABLE 
相关问题