2016-11-23 55 views
0

我在单元测试中使用以下工作将Avro/Parquet中的单个对象写入Cloudera/HDFS群集中的文件。如何使用Avro/Parquet将实时数据写入HDFS?

这就是说,考虑到Parquet是一种列式格式,它似乎只能在批处理模式下写出整个文件(不支持更新)。

那么,什么是实时写入数据文件(通过ActiveMQ/Camel)的最佳实践(1k msg /秒的小信息等)?

我想我可以聚合我的消息(缓冲区在内存或其他临时存储),并使用动态文件名在批处理模式下写出来,但我觉得我错过了手动分区/文件命名的东西,等等......

Configuration conf = new Configuration(false); 
conf.set("fs.defaultFS", "hdfs://cloudera-test:8020/cm/user/hive/warehouse"); 

conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); 
AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class); 

Path path = new Path("/cm/user/hive/warehouse/test1.data"); 

MyObject object = new MyObject("test"); 

Schema schema = ReflectData.get().getSchema(object.getClass()); 

ParquetWriter<InboundWirelessMessageForHDFS> parquetWriter = AvroParquetWriter.<MyObject>builder(path) 
    .withSchema(schema) 
    .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) 
    .withDataModel(ReflectData.get()) 
    .withDictionaryEncoding(false) 
    .withConf(conf) 
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) //required because the filename doesn't change for this test 
    .build(); 

parquetWriter.write(object); 
parquetWriter.close(); 

回答

0

基于我的(有限的)研究...我假设文件不能被附加到(设计)...所以我只是必须批量实时数据(在内存或其他地方)在写出实木复合地板上的文件之前...

How to append data to an existing parquet file