2014-09-03 141 views
8

我很新的spark,我试图从kafka主题接收一个结构化为json的DStream,我想解析每个json的内容。我收到的JSON是这样的:在spark-streaming中解析json

{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "} 

我想只提取IDENT领域,至少现在,我使用电梯,JSON库解析德数据。我的计划是这样的:

但低于抛出我的异常:

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest 
    at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300) 
    at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560) 
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

的事情是,如果运行不使用火花(从文件中读取)同它完美。当我尝试将它放入火花程序时,问题就开始了。另外,如果我将解析器功能更改为如下所示:

def parser(json: String): JValue = { 
    val parsedJson = parse(json) 
    return (parsedJson \\ "ident") 
} 

它也有效。但是当我尝试提取实际的字符串时,我得到了同样的错误。

谢谢你的帮助。我希望我解释得很好。

+1

这可能是你正在使用的scala版本不匹配。 – 2014-09-03 12:20:13

+0

我可以认为应该解析“paso1.extract [PlaneInfo]”json.extract [PlaneInfo]? – Gillespie 2015-09-02 15:42:44

回答

2

发生这种情况是因为您缺少scala反映需要序列化/反序列化记录的依赖性。 尝试添加匹配spark版本的scala反射罐。

提示: “org.scala琅” % “斯卡拉-反映” %Version.scala

0

哦,好的老问题:-)

基本上,这表明一个版本的问题:一个您的依赖与您当前使用的Scala编译器不兼容。你在2.10吗?

尝试使用谷歌搜索短语“NoClassDefFoundError:scala/reflect/ClassManifest”,我相信你会发现有关该问题的足够描述。