我想在火花流中使用Kryo串行器。我在Spark tuning docs阅读 -
最后,如果您没有注册您的自定义类,KRYO仍将 工作,但它必须完整的类名存储与每个对象, 这是一种浪费。
所以我想要注册所有的类。我的情况下,类 -
trait Message extends java.io.Serializable
object MutableTypes {
type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
type Parents = scala.collection.mutable.Map[Int, Childs]
}
case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message
,我注册类这样的 -
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))
我得到这个异常:
com.esotericsoftware.kryo.KryoException: 的java.lang .IllegalArgumentException:类未注册: scala.Tuple2 $ mcJZ $ sp注意:要注册此类,请使用: kryo.register(scala.Tuple2 $ mcJZ $ sp.class);序列化跟踪:在 com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:585) 在 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer parents_to_add(com.test.IncomingRecord)的.java:213) 在com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
在 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) 在 有机apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185) 在 org.apache.spark.util.collection.WritablePartitionedPairCollection $$不久$ 1.writeNext(WritablePartitionedPairCollection.scala:56) 在 org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 在 org.apache。 spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor $ TaskRunner.run( Executor.scala:214) at(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread。) java:745)
我该如何注册我的课程?如何解决这个问题?
更新:
我知道转弯registeration false将删除异常,但不会因为额外的开销,许多性能增加。我想知道如何注册我的课程。
不知道'classOf [Tuple2 [长,布尔]'选择专业类。你没有得到这个表情的警告吗?也许你可以在没有专业化的情况下用'case class'替换它。 –
我没有得到任何警告 –