Debezium连接器的Kafka连接事件是Avro编码。无法读取Kafka主题avro消息
在传递给Kafka连接独立服务的connect-standalone.properties中提到了以下内容。
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
配置具有这些特性的卡夫卡消费者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
在消费者实施,以下是阅读的关键和值组件的代码。我使用REST从Schema Registry获取键和值的架构。
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
解析键工作正常。在解析消息的值部分时,我得到了ArrayIndexOutOfBoundsException。
下载了Avro的源代码并进行了调试。发现GenericDatumReader.readInt方法返回一个负值。这个值预计是一个数组的索引(符号),因此应该是正数。
尝试使用kafka-avro-standalone-consumer使用事件,但它也抛出了ArrayIndexOutOfBoundsException。所以,我的猜测是这个消息在Kafka连接(制作人)&上编码不正确,问题出在配置上。
以下是问题:
- 这有什么错与生产者或消费者通过配置?
- 为什么重要的反序列化工作,但没有价值?
- 还有什么需要做的事情的工作? (如指定字符编码的地方)。
- Can Debezium with Avro可用于生产,还是现在的实验性功能?关于Debezium Avro的文章特别指出,涉及Avro的例子将会包含在内。
已经有很多帖子,在Avro反序列化抛出ArrayIndexOutOfBoundsException但无法将其与我面临的问题相关联。