2016-08-23 75 views
4

我需要通过添加两个新参数来修改类。这个类与Kryo序列化。 我每次停止流时都会持续保留与此课程有关的信息,其中包括RDD。 当我重新启动流时,我加载了先前保存的信息,并使用它们在停止和重新启动时保持一致。Kryo:反序列化旧版本的类

由于I类坚持需要这些新参数,因此我通过为新参数添加新的kryo.writeObject(output, object, ObjectSerializer)kryo.readObject(input, classOf[Object], ObjectSerializer)来更改类和序列化程序。

现在,无论何时重新启动我的流,我都会得到一个异常:“遇到未注册的类...”。

这似乎很明显,因为我试图反序列化一个对象,而这个对象不包含在当我停止流时持久化的信息中。 如果我删除这些数据并启动流,就好像它没有任何先前的运行一样,则不会发生异常。

有没有办法避免这种异常? 也许通过指定一些默认值来防止这些参数丢失?

谢谢

编辑:

我发现一些有用的东西,我没有看到前: Kryo issue 194

这个家伙通过简单地插入一个长的定义他应该使用的解串器版本来实现版本控制。 这是一个简单的解决方案,但是,由于编写我正在处理的代码的公司没有考虑向前兼容性,我想我必须在新序列化器之前将所有数据保留下来。

请让我知道如果你有人可以提出更好的解决方案。

编辑2:

这种局面仍然没有解决。 我试图按照此处所述使用CompatibleFieldSerializer:CompatibleFieldSerializer Example 因此,通过注册此序列化程序而不是以前使用的自定义程序。 结果是,现在,当重新加载持久数据时,它给出了一个java.lang.NullPointerException。 如果以前的数据不存在,仍然没有问题。我可以启动我的流,序列化新数据,停止流,反序列化并重新启动我的流。 仍然没有线索的决议。

回答

2

问题的解决方案是在几个月前发现的。所以我想尽快回答这个问题。 问题在于,由于代码中的错误,该类使用标准的Kryo FieldSerializer序列化,该序列不兼容。 我们必须执行以下操作来反序列化旧类并将其转换为新的序列化类。

的情况是:

case class ClassA(field1 : Long, field2 : String) 

它连载这样的:

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     classA( 
      field1 = input.readLong(), 
      field2 = input.readLong() 
     ) 

,并与串行序列化包含这些类序列,以注册所有人的所有序列化是循环类。

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    final def register(kryo: Kryo) = { 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

该类需要通过添加一个新字段进行修改,该字段是另一个案例类的实例。

为了进行这样的改变,我们不得不使用有关的KRYO库“可选”的注释,

... 
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional 
import scala.annotation.meta.field 
... 

case class ClassA(field1 : Long, field2 : String, @(Optional @field)("field3") field3 : ClassB) 

串行器被修改,例如读取旧的序列化的类时,它可以实例字段3用默认值和,写入时,写这样默认值:

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
     kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer) 

} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     ClassA( 
      field1 = input.readLong(), 
      field2 = input.readLong(), 
      field3 = ClassB.default 
     ) 

的KRYO串行登记还修改也注册可选字段:

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    def optionals = Seq("field3") 

    final def register(kryo: Kryo) = { 
     optionals.foreach { optional => 
     kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) } 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

因此,我们能够编写新版本的序列化类。 之后,我们必须删除可选注释,修改序列化程序以便从新的序列化类读取实际字段,并删除可选的序列化程序注册并将其添加到注册表Seq。

与此同时,我们纠正了强制通过FieldSerializer进行序列化的代码中的错误,但这不在问题的范围之内。