2016-08-02 63 views
3

我正在尝试使用Flink来从Kafka主题读取AVRO数据1.0.3。如何使用Avro和Flink解码Kafka邮件

我只知道,这个特定的卡夫卡主题是有AVRO编码的消息,我有AVRO架构文件。

My Flink code:

public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092"); 
     properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181"); 
     properties.setProperty("group.id", "Zeeshantest"); 
     AvroDeserializationSchema<Event> avroSchema = new AvroDeserializationSchema<>(Event.class); 
     FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); 
     DataStream<Event> messageStream = env.addSource(kafkaConsumer); 
     messageStream.rebalance().print(); 
     env.execute("Flink AVRO KAFKA Test"); 
    } 

我已经使用的Avro工具和架构 “rocana.avsc”

​​

这里是rocana.avsc文件在github上上传创建我的Event.java文件。

AvroDeserializationSchema.java

import org.apache.avro.io.BinaryDecoder; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.reflect.ReflectDatumReader; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.api.java.typeutils.TypeExtractor; 
import org.apache.flink.streaming.util.serialization.DeserializationSchema; 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> { 

    private static final long serialVersionUID = 4330538776656642778L; 

    private final Class<T> avroType; 
    private transient DatumReader<T> reader; 
    private transient BinaryDecoder decoder; 

    public AvroDeserializationSchema(Class<T> avroType) { 
     this.avroType = avroType; 
    } 

    @Override 
    public T deserialize(byte[] message) { 
     ensureInitialized(); 
     try { 
      decoder = DecoderFactory.get().binaryDecoder(message, decoder); 
      return reader.read(null, decoder); 
     } catch (Exception e) { 
      throw new RuntimeException(e); 
     } 
    } 

    @Override 
    public boolean isEndOfStream(T nextElement) { 
     return false; 
    } 

    @Override 
    public TypeInformation<T> getProducedType() { 
     return TypeExtractor.getForClass(avroType); 
    } 

    private void ensureInitialized() { 
     if (reader == null) { 
      if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { 
       reader = new SpecificDatumReader<T>(avroType); 
      } else { 
       reader = new ReflectDatumReader<T>(avroType); 
      } 
     } 
    } 
} 

在运行我的程序,我收到以下错误:

17:25:30,759 INFO org.apache.zookeeper.ZooKeeper        - Session: 0x356350cb9001857 closed 
17:25:30,759 INFO org.apache.zookeeper.ClientCnxn        - EventThread shut down 
17:25:30,761 INFO org.apache.flink.runtime.taskmanager.Task      - Source: Custom Source (3/4) switched to FAILED with exception. 
java.lang.Exception: 2 
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) 
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) 
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) 
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) 
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) 
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) 
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274) 
    at org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52) 
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) 
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) 
17:25:30,769 INFO org.apache.flink.runtime.taskmanager.Task      - Freeing task resources for Source: Custom Source (3/4) 
17:25:30,776 INFO org.apache.flink.runtime.taskmanager.Task      - Sink: Unnamed (1/4) switched 

我认为我的反序列化的代码是不正确的。有谁知道我做错了什么?这是使用Flink从Kafka读取AVRO数据的方式,还是有更好的办法?

+0

您确定Kafka中的数据是使用完全相同的模式版本编码的吗?通过创建一个'新的SpecificDatumReader (avroType)',你告诉数据读取器'avroType'模式既是读写器模式,我相信你可能会得到这些类型的异常,如果事实上有不同版本的架构最初用于编码消息。 – Josh

+0

是的,模式文件是正确的,我在'logstash'中使用了相同的'schema file'和'kafka topic',它的工作完美无瑕。 – Zeeshan

回答

0

任何代码我张贴按我的问题,工作完全正常。

问题在于数据发送到kafka主题,JSON和AVRO格式的数据都发送到那里。我订阅了一个不同的Kafka主题,其中数据仅在AVRO中,而且我的代码工作正常。

2

尝试下面的代码反序列化的Avro记录:

Schema a; //Your Avro schema 
DatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>(a); 
GenericData.Record a = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); 
+0

不,不工作。同样的错误。通过以下方式创建模式:'Schema schema = new Schema.Parser()。parse(new File(“./ src/main/resources/rocana.avsc”));' – Zeeshan

+0

如果数据只是Avro编码,你能否检查一下你如何将数据摄入卡夫卡。如果您正在使用某个库,请阅读相同的编码器。 – Garry

+3

问题在于发送给kafka主题的数据时,JSON和AVRO格式的数据都发送到那里。我订阅了一个不同的Kafka主题,其中数据仅在AVRO中,而且我的代码工作正常。 – Zeeshan