2013-12-16 61 views
16

我在Spark,我有一个Avro文件的RDD。我现在想要做对RDD一些转换,并将其保存回为Avro的文件:Spark:写Avro文件

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

运行此星火抱怨架构$ recordSchema是不可序列。

如果我取消注释.map调用(并且只有rdd.saveAsNewAPIHadoopFile),则调用成功。

我在这里做错了什么?

有什么想法?

+0

您能否提供异常堆栈跟踪? Spark,Hadoop和Avro版本号也可能有用。 – Wildfire

+0

请原谅我的天真。请问在这里做什么工作?看起来这是一个地图减少工作?如果我们使用spark来写出,为什么我们需要map reduce作业? –

回答

2

Spark使用的默认序列化程序是Java序列化。所以对于所有的Java类型,它都会尝试使用Java序列化进行序列化。 AvroKey不可序列化,所以你会得到错误。

您可以在您的自定义序列化(如Avro)中使用KryoSerializer或插件。你可以在这里阅读关于序列化的更多信息http://spark-project.org/docs/latest/tuning.html

你也可以用可外部化的东西包装你的对象。查看例如包含AvroFlumeEvent的SparkFlumeEvent:https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

此处的问题与Job中使用的avro.Schema类的非可序列化有关。当您试图从map函数内的代码引用架构对象时,会引发异常。

举例来说,如果你尝试做如下,您将得到“任务不序列化”例外:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

可以让一切由刚刚创建模式的新实例工作功能块内:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

既然你不喜欢解析为每次处理记录的Avro架构,更好的解决方案将是解析划分级别的模式。以下也可以工作:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

上面的代码,只要你提供一种便携式参考jsonSchema文件中,由于地图的功能是要由多个远程执行人被执行的工作原理。它可以是对HDFS中文件的引用,也可以与JAR中的应用程序一起打包(在后一种情况下,您将使用类加载器函数来获取其内容)。

对于那些谁尝试使用Avro的星火,发现仍存在一些未解决的编译问题,你必须使用Maven的POM以下导入:

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

注意"hadoop2"分类。您可以在https://issues.apache.org/jira/browse/SPARK-3039上跟踪问题。

+0

当我们的map函数中没有外部依赖关系时,此方法正常工作。有什么方法可以使模式序列化? – COSTA