2013-06-28 42 views
2

我有一个复杂的Avro记录(嵌套记录,联合类型),我在HBase中存储为一个值。从Java的HBase中读取Avro记录

Schema schema = new Schema.Parser().parse(schema_file); 
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema); 
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(avro_file, datumReader); 
GenericRecord record = null; 
record = dataFileReader.next(record); 

我再检查HBase的,看是否具有相同rowkey的记录已经存在:我在Avro的数据文件中使用,目前该文件的作家模式相匹配的模式,并且在HBase的记录读取。由我获取返回的val是Avro的记录的字节数组表示:

Configuration conf = HBaseConfiguration.create(); 
HTable table = new HTable(conf, "table"); 
String pk = new String(record.get("x").toString()+record.get("y").toString()); 
Get get = new Get(Bytes.toBytes(pk)); 
Result result = table.get(get); 
byte[] val = result.getValue(Bytes.toBytes("c"),Bytes.toBytes("c")); 

如果没有记录在HBase的具有相同rowkey,我把纪录:

if (val == null) { 
    System.out.println("pk: "+pk+" does not exist"); 
    Put put = new Put(Bytes.toBytes(pk)); 
    put.add(Bytes.toBytes("c"), Bytes.toBytes("c"), Bytes.toBytes(record.toString())); 
    try { 
    table.put(put); 
    } catch (Exception e) { 
    System.err.println("Can't put to table: " + e); 
    } 
} 
else { 
    System.out.println("pk: "+pk+" does exist"); 
    //help me! 
} 

如果有在HBase中使用相同rowkey的记录,我想将HBase结果的字节数组转换回Avro模式,然后比较几个字段以查看哪个记录“更好”。我想把这个'更好'的唱片放入HBase中,但我被卡住了。如何将字节数组从HBase转换为GenericRecord,以便我可以比较文件记录和HBase之间的字段?

回答

4

我想通了。我需要将我的记录作为序列化字节数组写入HBase,而不是将字符串转换为字节数组。

看跌变为:

ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(writer); 
dfw.create(schema, baos); 
dfw.append(record); 
dfw.close(); 
Put put = new Put(Bytes.toBytes(pk)); 
put.add(Bytes.toBytes("c"), Bytes.toBytes("c"), baos.toByteArray()); 

这让:

GenericRecord hrecord = null; 
    ByteArrayInputStream bais = new ByteArrayInputStream(val); 
    DataFileStream<GenericRecord> dfs = new DataFileStream<GenericRecord>(bais, datumReader); 
    hrecord = dfs.next(hrecord);