2015-09-03 52 views
1

我试图发送使用Spring集成卡夫卡V1.2.1基本字符串的有效载荷,但它与下面的异常失败:Spring集成卡夫卡 - 发送一个基本的字符串

2015-09-03 11:50:39.729 ERROR 14418 --- [task-executor-3] [         ] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer 

我的XML配置如下所示:

<task:executor id="schedule-request-task-executor" pool-size="5" keep-alive="120" queue-capacity="125"/> 

<bean id="kafkaStringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/> 

<int-kafka:producer-context id="schedule-request-producer-context"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration topic="schedule.requests" 
              key-serializer="kafkaStringSerializer" 
              value-serializer="kafkaStringSerializer" 
              broker-list="${kafka.brokers}"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

<int-kafka:outbound-channel-adapter 
     kafka-producer-context-ref="schedule-request-producer-context" 
     channel="schedule-request-channel"> 
    <int:poller receive-timeout="0" 
       fixed-delay="100" time-unit="MILLISECONDS" 
       task-executor="schedule-request-task-executor"/> 
</int-kafka:outbound-channel-adapter> 

而且我用下面的代码发送消息:

Message message = MessageBuilder.withPayload("PAYLOAD") 
       .setHeader("messageKey", "KEY") 
       .setHeader("topic", "schedule.requests") 
       .build(); 
scheduleRequestChannel.send(message); 

我看了一下https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/的样品,但是这些看起来已经过时了。

+0

我调试了这个,看起来ProducerRecord中的值在由于某种原因传递给序列化程序时转换为byte []。不知道为什么。 –

回答

2

调试SI和Kafka类后,我发现这是因为Spring集成将字符串转换为byte [],除非在Producer配置中指定了key-class-typevalue-class-type

这是更新的配置,以防有人感兴趣。

<int-kafka:producer-context id="schedule-request-producer-context"> 
     <int-kafka:producer-configurations> 
      <int-kafka:producer-configuration topic="schedule.requests" 
               key-class-type="java.lang.String" 
               key-serializer="kafkaStringSerializer" 
               value-class-type="java.lang.String" 
               value-serializer="kafkaStringSerializer" 
               broker-list="${kafka.brokers}"/> 
相关问题