2014-12-19 33 views
2

我创建了一个JSON数据,并为它的Avro的模式:如何从HDFS中检索Avro数据?

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } 
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 } 

{ “类型”: “记录”, “名”: “twitter_schema” “namespace”:“com.miguno.avro”,“fields”:[{“012”:“name”:“username”, “type”:“string”, “doc”:“ com“},{ ”name“:”tweet“, ”type“:”string“, ”do c“:”用户的Twitter消息的内容“},{ ”name“:”timestamp“, ”type“:”long“, ”doc“:”Unix epoch time in seconds“}]”doc :”: “用于存储的Twitter消息的基本模式”}

我然后将其转化为阿夫罗如下:

java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro 

将文件上HDFS与此:

hadoop fs -copyFromLocal twitter.avro <path> 
做的时候

import org.apache.avro.generic.GenericRecord 
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 

val path = "hdfs:///path/to/your/avro/folder" 
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path) 

但是:

avroRDD.first 

我面临以下异常:

org.apache.spark

,然后在星火CLI出具的本人代码。 SparkException:由于阶段失败导致作业中止: 阶段7.0(TID 13)中的任务2.0具有不可序列化的结果: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala:1174) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1173) 在 scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray .scala:59) 在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

什么是它的解决方案?

回答

3

Spark正在尝试对您的avro数据进行ser/de,但它不是“java serializable”(spark中使用的默认服务)。

你有几个选择:

  • 提取物从包装通用记录每条记录映射到某个序列化结构
  • 生成特定记录类和DESER给他们,而不是通用的记录(你仍然会需要提取从包装备案)
  • 使KRYO序列化(这将在工作,某些情况下只有

请注意,记录会在内部重复使用,因此如果您例如使用rdd.collect,则最终将记录具有相同值的所有记录。在进行收集之前将原始输入数据映射到其他东西,以便在复制时解决问题。