-1
我正在类转换异常而消息经过卡夫卡流API,类转换异常而消息经过卡夫卡流API
例外是:
java.lank/ClassCastException异常:[B不能转换为com.fasterxml.jackson.databind.JsonNode
我流的代码是:
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"samsmembershipkafka.dev.cloud.wal-mart.com:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final Serde<String> stringSerde = Serdes.String();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
/*
* config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde);
* config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
* jsonSerde);
*/
// Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> textlines = builder.stream("MainTopic");
Predicate<String, JsonNode> isAddComment = (key, value) -> value
.get("header").toString().contains("/addComment");
Predicate<String, JsonNode> is1M1C = (key, value) -> value
.get("header").toString().contains("/1Member1Account");
Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value
.get("header").toString()
.contains("/changeCardStatus?action=STOLEN") || value
.get("header").toString()
.contains("/changeCardStatus?action=LOST"));
KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment,
is1M1C, isLostOrStolen);
topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic");
topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic");
topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
什么是完整的堆栈跟踪?似乎你在某个地方使用了错误的Serde。从你的代码片段中,我会假设你在阅读主题'builder.stream(stringSerde,jsonSerde,“MainTopic”)时需要指定JSON-Serde;' - 我想你需要为其他操作指定正确的serdes 。仔细查看堆栈跟踪,找出哪个操作员抛出异常。 –
是的,这是问题,现在工作,谢谢 – user8677554
把我的评论作为答案,所以你可以标记问题为答案。 –