2016-08-12 120 views
0

我有一个java卡夫卡消费者,从卡夫卡消费avro数据[说主题x]。它应该将这些数据推送到HDFS,因为它不需要代码生成。在Avro公司documentation他们使用类似以下内容:写AVRO数据到Hadoop hdfs

GenericRecord e1 = new GenericData.Record(schema);  
e1.put("key", "value"); 

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); 

DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
dataFileWriter.create(schema, new File("<HDFS file path>")); 

dataFileWriter.append(e1); 
dataFileWriter.close(); 

问题的,这是,我已经有Avro的数据。要使用这一系列步骤,我必须在反序列化avro数据包之后提取每个键 - 值对,然后将其推送到GenericRecord对象,我认为这没有任何意义。我没有找到任何我想要实现的例子。任何提示或链接到相关文件非常感谢。

+2

您有没有考虑使用Kafka内置的Kafka Connect框架结合现有的Kafka-> HDFS连接器,如[kafka-connect-hdfs](https:// github .COM/confluentinc /卡夫卡连接-HDFS)?链接的HDFS接收器连接器支持Avro开箱即用。 –

+0

由于一些模式注册问题,我们已经从融合中移除。所以我不能使用融合框架。 – Bitswazsky

+0

关心详细说明您已经遇到的模式注册表问题?当然也可以通过https://github.com/confluentinc/schema-registry/issues完成。 –

回答

0

如果我正确理解你的问题,我建议你尝试com.twitter.bijection.Injection和com.twitter.bijection.avro.GenericAvroCodecs软件包,例如。

看看这里http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-producing-and-consuming-avro-messages.html。 那里,在卡夫卡生产者的GenericRecord被转换为字节[],这是放在卡夫卡话题,然后在消费者这个字节根据您的架构倒置成一个GenericRecord。而且您不需要将值记录到记录中的所有字段。之后,您可以将此记录写入文件。

而且,您也可能需要以其他方式访问HDFS中的文件,因为您无法为其创建File实例。