0
我正在尝试将过滤器应用于来自Kafka Direct Stream的json数据流。 我使用net.liftweb lift-json_2.11
来解析示例JSON {"type": "fast", "k":%d}
。 这是我的代码:使用apache spark过滤JSON(Scala SQL) - Scala
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val s1 = stream.map(record => parse(record.value))
的s1.print()
结果是:
...
JObject(List(JField(type,JString(fast)), JField(k,JInt(11428))))
JObject(List(JField(type,JString(fast)), JField(k,JInt(11429))))
JObject(List(JField(type,JString(fast)), JField(k,JInt(11430))))
...
我怎样才能在k
场施加的火花过滤器?例如: k%2==0
我不想使用SparkSQL,因为我需要应用连接数据流和SparkSQL不允许我这样做。 感谢
你就不能应用'.filter'你'stream.map(记录=>解析(record.value))'? – Interfector
定义一个表示JSON条目的case类,例如'case class Entry(type:String,k:Int)',然后使用'parse(record.value).extract [Entry]'获得'Entry'的流。过滤应该归结为简单地执行's1.filter(e => e.k%2 == 0)'。 –
@HristoIliev如果我试图得到一个'在线程中的异常“主”org.apache.spark.SparkException:任务不可序列化“错误 –