2016-10-28 50 views
3

我尝试使用Kafka流将具有String/JSON消息的主题转换为另一个主题作为Avro消息。从JSON到Avro的Kafka流

流主要方法:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 

    final KStreamBuilder builder = new KStreamBuilder(); 


    final Serde<String> stringSerde = Serdes.String(); 

    builder.stream(stringSerde, stringSerde, "testin") 
      .mapValues(value -> AvroUtil.transform(value)) 
      .to("testout"); 

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
    streams.start(); 

转型:

public static GenericRecord transform(Object value) { 

    // ... parse string/json and generate Avro object 

    String userSchema = "{\"type\":\"record\"," + 
      "\"name\":\"myrecord\"," + 
      "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; 
    Schema.Parser parser = new Schema.Parser(); 
    Schema schema = parser.parse(userSchema); 
    GenericRecord avroRecord = new GenericData.Record(schema); 
    avroRecord.put("f1", "value1"); 

    return avroRecord; 
} 

而获得类似这样的例外:

Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: com.fasterxml.jackson.annotation.JsonProperty.access()Lcom/fasterxml/jackson/annotation/JsonProperty$Access; 
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findPropertyAccess(JacksonAnnotationIntrospector.java:229) 
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:545) 
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:542) 
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.fromMemberAnnotationsExcept(POJOPropertyBuilder.java:996) 
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.findAccess(POJOPropertyBuilder.java:542) 
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.removeNonVisible(POJOPropertyBuilder.java:623) 
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._removeUnwantedAccessor(POJOPropertiesCollector.java:697) 
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:298) 
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169) 
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:222) 
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:355) 
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210) 
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153) 
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203) 
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157) 
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481) 
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679) 
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107) 
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559) 
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927) 
at io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest.toJson(RegisterSchemaRequest.java:76) 
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232) 
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224) 
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219) 
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:58) 
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:90) 
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) 
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) 

这是正确的做法?我是新来的卡夫卡流和Avro公司

回答

2

就在杰克逊依赖失踪:

<dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.7.8</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-annotations</artifactId> 
     <version>2.7.8</version> 
    </dependency> 

现在,它的工作原理

+0

您可能要接受你自己的答案。 –

相关问题