2016-08-03 97 views
2

我一直在试图找到春季启动卡夫卡骆驼Avro消费者没有运气的示例代码。我发现春天骆驼卡夫卡消费者和生产者的样品在以下网址:春季开机卡夫卡骆驼Avro消费者

https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/

但什么是缺少的是Avro的一部分。我在看骆驼文档Avro的位置:

http://camel.apache.org/avro.html

我的具体问题是,一旦我的豆从Avro公司架构创建和我有POJO类,我怎么告诉骆驼春天例如上述用户Avro序列化?具体来说,我指这行代码: 从( “海边的卡夫卡:本地主机:9092主题=测试& zookeeperHost =本地主机& zookeeperPort = 2181 &的groupId =组别1 & serializerClass = kafka.serializer.StringEncoder”)。豆(kafkaOutputBean.class );

其中串行器是StringEncoder。我如何告诉Camel使用Avro序列化?

+0

不会'serializerClass = ...'放在哪里?例如'&serializerClass = avro.serializer.StringEncoder' – jny

回答

0

我找到了我自己的答案。因此,我想与你分享。它实际上是serializerClass=org.springframework.integration.kafka.serializer.avro.AvroSerializer。代码非常简单,你可以编写自己的代码。

public class AvroSerializer<T> { 

     public T deserialize(final byte[] bytes, final DatumReader<T> reader) throws IOException { 
      final Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); 
      return reader.read(null, decoder); 
     } 

     public byte[] serialize(final T input, final DatumWriter<T> writer) throws IOException { 
      final ByteArrayOutputStream stream = new ByteArrayOutputStream(); 

      final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null); 
      writer.write(input, encoder); 
      encoder.flush(); 

      return stream.toByteArray(); 
     } 
    }