当试图读取Avro文件时,我遇到了同样的问题。原因是AvroWrapper没有实现java.io.Serializable
接口。
解决方案是使用org.apache.spark.serializer.KryoSerializer
。
import org.apache.spark.SparkConf
val cfg = new SparkConf().setAppName("MySparkJob")
cfg.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
cfg.set("spark.kryo.registrator", "com.stackoverflow.Registrator")
但是这还不够,因为我的课,这是在Avro的文件,没有实行Serializable
无论是。
因此我添加了自己的注册人,扩展为KryoRegistrator
,并包含chill-avro库。
class Registrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[MyClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[MyClassInAvroFile])
kryo.register(classOf[AnotherClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[AnotherClassInAvroFile])
}
}
然后,我能读这样的文件:
ctx.hadoopFile("/path/to/the/avro/file.avro",
classOf[AvroInputFormat[MyClassInAvroFile]],
classOf[AvroWrapper[MyClassInAvroFile]],
classOf[NullWritable]
).map(_._1.datum())
你直接收集的?你应该将它们映射到一些Serializable类,因为它们不能被序列化。 – zsxwing 2014-11-21 02:04:27