2017-08-04 136 views
0

Debezium连接器的Kafka连接事件是Avro编码。无法读取Kafka主题avro消息

在传递给Kafka连接独立服务的connect-standalone.properties中提到了以下内容。

key.converter=io.confluent.connect.avro.AvroConverter 
value.confluent=io.confluent.connect.avro.AvroConverter 
internal.key.converter=io.confluent.connect.avro.AvroConverter 
internal.value.converter=io.confluent.connect.avro.AvroConverter 
schema.registry.url=http://ip_address:8081 
internal.key.converter.schema.registry.url=http://ip_address:8081 
internal.value.converter.schema.registry.url=http://ip_address:8081 

配置具有这些特性的卡夫卡消费者代码:

Properties props = new Properties(); 
props.put("bootstrap.servers", "ip_address:9092"); 
props.put("zookeeper.connect", "ip_address:2181"); 
props.put("group.id", "test-consumer-group"); 
props.put("auto.offset.reset","smallest"); 
//Setting auto comit to false to ensure that on processing failure we retry the read 
props.put("auto.commit.offset", "false"); 
props.put("key.converter.schema.registry.url", "ip_address:8081"); 
props.put("value.converter.schema.registry.url", "ip_address:8081"); 
props.put("schema.registry.url", "ip_address:8081"); 

在消费者实施,以下是阅读的关键和值组件的代码。我使用REST从Schema Registry获取键和值的架构。

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); 

解析键工作正常。在解析消息的值部分时,我得到了ArrayIndexOutOfBoundsException。

下载了Avro的源代码并进行了调试。发现GenericDatumReader.readInt方法返回一个负值。这个值预计是一个数组的索引(符号),因此应该是正数。

尝试使用kafka-avro-standalone-consumer使用事件,但它也抛出了ArrayIndexOutOfBoundsException。所以,我的猜测是这个消息在Kafka连接(制作人)&上编码不正确,问题出在配置上。

以下是问题:

  1. 这有什么错与生产者或消费者通过配置?
  2. 为什么重要的反序列化工作,但没有价值?
  3. 还有什么需要做的事情的工作? (如指定字符编码的地方)。
  4. Can Debezium with Avro可用于生产,还是现在的实验性功能?关于Debezium Avro的文章特别指出,涉及Avro的例子将会包含在内。

已经有很多帖子,在Avro反序列化抛出ArrayIndexOutOfBoundsException但无法将其与我面临的问题相关联。

回答

相关问题