2015-01-12 16 views
0

我有一个Avro的模式是这样的 -如何在解码原始字节数组后更改特定的字段值?

{ 
    "type":"record", 
    "name":"new_user", 
    "namespace":"com.hello", 
    "fields":[ 
     { 
     "name":"user_id", 
     "type":[ 
      "long", 
      "null" 
     ] 
     }, 
     { 
     "name":"segment", 
     "type":[ 
      "string", 
      "null" 
     ] 
     } 
    ] 
} 

我用我上面的Avro架构这样的序列化数据,并且给了我一个字节数组,并工作正常 -

public static void main(String[] args) throws IOException { 
    Schema schema = new Parser() 
      .parse("{ \"type\":\"record\", \"name\":\"new_user\", \"namespace\":\"com.hello\", \"fields\":[ { \"name\":\"user_id\", \"type\":[ \"long\", \"null\" ] }, { \"name\":\"segment\", \"type\":[ \"string\", \"null\" ] } ] }"); 

    byte[] originalAvrodata = getAvroBinaryData(schema); 

    // how to get newAvroData byte array in which user_id 
    // is change to some other random long number? 
} 

private static byte[] getAvroBinaryData(Schema schema) throws IOException { 
    GenericRecord record = new GenericData.Record(schema); 
    record.put("user_id", 123456L); 
    record.put("segment", "hello"); 

    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 

    Encoder e = EncoderFactory.get().binaryEncoder(os, null); 

    writer.write(record, e); 
    e.flush(); 
    byte[] byteData = os.toByteArray(); 
    return byteData; 
} 

问题陈述:

我需要将originalAvrodata字节数组解码,然后将user_id字段值改变为其它一些long编号然后构造一个newAvroData字节数组使用相同的模式应该有user_id字段值到一些随机long数字。这有可能通过任何使用Avro的机会来实现吗?

回答

1

当然,这里是一些注释代码,它应该帮助你开始:

public static void main(String[] args) throws IOException, JSONException { 
     Schema schema = new Schema.Parser() 
      .parse("{ \"type\":\"record\", \"name\":\"new_user\", \"namespace\":\"com.hello\", \"fields\":[ { \"name\":\"user_id\", \"type\":[ \"long\", \"null\" ] }, { \"name\":\"segment\", \"type\":[ \"string\", \"null\" ] } ] }"); 

     // create example record 
     GenericRecord record = new GenericData.Record(schema); 
     record.put("user_id", 123456L); 
     record.put("segment", "hello"); 

     // serialize record 
     byte[] recordData = getAvroBinaryData(schema, record); 

     // de-serialize byte array to record 
     GenericRecord readRecord = readRecord(schema, recordData); 

     // increment user_id field 
     Long userId = (Long) readRecord.get("user_id"); 
     readRecord.put("user_id", userId + 1); 

     // prints 123457 for the user_id 
     System.out.println(readRecord); 

     // serialize updated recored 
     byte[] updatedRecordData = getAvroBinaryData(schema, readRecord); 

     // do something with updatedRecordData 
    } 

    private static GenericRecord readRecord(Schema schema, byte[] originalAvrodata) throws IOException { 
     Decoder decoder = DecoderFactory.get().binaryDecoder(originalAvrodata, null);  
     DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
     GenericRecord readRecord = null; 

     try { 
      readRecord = reader.read(null, decoder);    
     } catch (EOFException eofe) { 
      eofe.printStackTrace(); 
     } 

     return readRecord; 
    } 

    // takes the record to be serialized as an additonal parameter 
    private static byte[] getAvroBinaryData(Schema schema, GenericRecord record) throws IOException { 
     GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
     return byteData; 
    } 
相关问题