2017-03-06 73 views
0

我使用spring-cloud-stream-schema来读取来自kafka的avro消息。我在MessagesChannels配置输入通道:春云kafka和avro序列化问题

@Input("topicName1") 
SubscribableChannel fromInput1(); 

我有这样的配置文件:

@Configuration 
@EnableBinding(MessagesChannels.class) 
@EnableSchemaRegistryClient 
public class MessageConfiguration { 
    @Bean 
    public MessageConverter topic1MessageConverter() throws IOException { 
    return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); 
    } 
} 

我的消费被称为与

fromInput1().subscribe(this::onMessage); 

void onMessage(Message message) { 
} 

当我实际发送消息我得到这个错误:

nested exception is java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to [B 

实际上原始字节被正确解析为org.apache.avro.generic.GenericData$Record。但春天需要Message班。如何将GenericData$Record转换为Message或如何将GenericData$Record直接转换为由avro-tools类生成?

更多细节:

2017-03-06 11:23:10.695 ERROR 19690 --- [afka-listener-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 7979, CreateTime = 1488784987569, checksum = 623709057, serialized key size = -1, serialized value size = 36, key = null, value = {"foor": "bar"}) 

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cl[email protected]4bf9d802]; nested exception is java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to [B 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) 

回答

1

我认为你需要设置contentType为使用application/*+avro作为传入消息通道指定here