2017-02-02 13 views
2

我正在使用Avro序列化对象,然后将它们添加到将由客户端使用和反序列化的Kafka消息中。我尝试了几种不同的序列化方法,但他们都没有将模式嵌入到数据流中。这是我的序列化代码的最新版本。您可以看到注释掉的尝试使用各种可用的编写器。C#Avro Schema未使用数据流编码

public static byte[] Serialize<T>(T recordObj) where T : ISpecificRecord 
    { 
     Log.Info("Serializing {0} object to Avro.", typeof(T)); 
     try 
     { 
      using (var ms = new MemoryStream()) 
      { 
       var encoder = new BinaryEncoder(ms); 
       //var writer = new SpecificDefaultWriter(recordObj.Schema); 
       var writer = new SpecificDatumWriter<T>(recordObj.Schema); 
       //writer.Write(recordObj.Schema, recordObj, encoder); 
       writer.Write(recordObj, encoder); 
       return ms.ToArray(); 
      } 
     } 
     catch (Exception ex) 
     { 
      Log.Error("Failed to Avro serialize object. {0}", ex); 
      return null; 
     } 
    } 

我不太确定还有什么可以尝试的。

回答

1

经过在实际的Avro代码中挖掘后,我发现我需要一个FileWriter,但无法弄清楚如何实例化一个,因为DataFileWriter没有公共构造函数。原来在DataFileWriter类中有一个名为OpenWriter的静态方法,它接受一个DatumWriter和一个Stream并返回一个DataFileWriter。下面的代码现在在结果数据流中正确包含了对象元数据。

public static byte[] Serialize<T>(T recordObj) where T : ISpecificRecord 
    { 
     Log.Info("Serializing {0} object to Avro.",typeof(T)); 
     try 
     { 
      using(var ms = new MemoryStream()) 
      { 
       var specDatumWriter = new SpecificDatumWriter<T>(recordObj.Schema); 
       var specDataWriter = Avro.File.DataFileWriter<T>.OpenWriter(specDatumWriter, ms); 
       specDataWriter.Append(recordObj); 
       specDataWriter.Flush(); 
       specDataWriter.Close(); 
       return ms.ToArray(); 
      } 
     } 
     catch(Exception ex) 
     { 
      Log.Error("Failed to Avro serialize object. {0}",ex); 
      return null; 
     } 
    }