我创建了一个JSON数据,并为它的Avro的模式:如何从HDFS中检索Avro数据?
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
和
{ “类型”: “记录”, “名”: “twitter_schema” “namespace”:“com.miguno.avro”,“fields”:[{“012”:“name”:“username”, “type”:“string”, “doc”:“ com“},{ ”name“:”tweet“, ”type“:”string“, ”do c“:”用户的Twitter消息的内容“},{ ”name“:”timestamp“, ”type“:”long“, ”doc“:”Unix epoch time in seconds“}]”doc :”: “用于存储的Twitter消息的基本模式”}
我然后将其转化为阿夫罗如下:
java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
将文件上HDFS与此:
hadoop fs -copyFromLocal twitter.avro <path>
做的时候
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但是:
avroRDD.first
我面临以下异常:
org.apache.spark
,然后在星火CLI出具的本人代码。 SparkException:由于阶段失败导致作业中止: 阶段7.0(TID 13)中的任务2.0具有不可序列化的结果: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala:1174) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1173) 在 scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray .scala:59) 在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
什么是它的解决方案?