2017-09-26 44 views
-1

我正在类转换异常而消息经过卡夫卡流API,类转换异常而消息经过卡夫卡流API

例外是:

java.lank/ClassCastException异常:[B不能转换为com.fasterxml.jackson.databind.JsonNode

我流的代码是:

public static void main(String[] args) { 
    Properties config = new Properties(); 
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API"); 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "samsmembershipkafka.dev.cloud.wal-mart.com:9092"); 
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
      jsonDeserializer); 

    /* 
    * config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde); 
    * config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
    * jsonSerde); 
    */ 

    // Building Stream 
    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, JsonNode> textlines = builder.stream("MainTopic"); 

    Predicate<String, JsonNode> isAddComment = (key, value) -> value 
      .get("header").toString().contains("/addComment"); 
    Predicate<String, JsonNode> is1M1C = (key, value) -> value 
      .get("header").toString().contains("/1Member1Account"); 
    Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=STOLEN") || value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=LOST")); 

    KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment, 
      is1M1C, isLostOrStolen); 

    topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic"); 
    topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic"); 
    topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic"); 

    KafkaStreams streams = new KafkaStreams(builder, config); 

    streams.start(); 
} 
+0

什么是完整的堆栈跟踪?似乎你在某个地方使用了错误的Serde。从你的代码片段中,我会假设你在阅读主题'builder.stream(stringSerde,jsonSerde,“MainTopic”)时需要指定JSON-Serde;' - 我想你需要为其他操作指定正确的serdes 。仔细查看堆栈跟踪,找出哪个操作员抛出异常。 –

+1

是的,这是问题,现在工作,谢谢 – user8677554

+0

把我的评论作为答案,所以你可以标记问题为答案。 –

回答

0

似乎你在某个地方使用了错误的Serde。从你的代码片段,我会假设你阅读的话题时,需要指定JSON-SERDE:

builder.stream(stringSerde, jsonSerde, "MainTopic"); 

我猜你会需要进行其他操作指定正确的Serde S,太。仔细查看堆栈跟踪,找出哪个操作员抛出异常

相关问题