2017-05-10 97 views
0

我正在尝试将过滤器应用于来自Kafka Direct Stream的json数据流。 我使用net.liftweb lift-json_2.11来解析示例JSON {"type": "fast", "k":%d}。 这是我的代码:使用apache spark过滤JSON(Scala SQL) - Scala

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 

val s1 = stream.map(record => parse(record.value)) 

s1.print()结果是:

... 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11428)))) 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11429)))) 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11430)))) 
... 

我怎样才能在k场施加的火花过滤器?例如: k%2==0

我不想使用SparkSQL,因为我需要应用连接数据流和SparkSQL不允许我这样做。 感谢

+0

你就不能应用'.filter'你'stream.map(记录=>解析(record.value))'? – Interfector

+0

定义一个表示JSON条目的case类,例如'case class Entry(type:String,k:Int)',然后使用'parse(record.value).extract [Entry]'获得'Entry'的流。过滤应该归结为简单地执行's1.filter(e => e.k%2 == 0)'。 –

+0

@HristoIliev如果我试图得到一个'在线程中的异常“主”org.apache.spark.SparkException:任务不可序列化“错误 –

回答

0

SOLUTION:

//spark import 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

//kafka import 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 

//json library import 
import org.json4s._ 
import org.json4s.native.JsonMethods._ 
import org.json4s.native.Serialization 
import org.json4s.native.Serialization.{read, write} 

object App { 

    def main(args : Array[String]) { 

// Create the context with a 1 second batch size 
val sparkConf = new SparkConf().setAppName("SparkScript").setMaster("local[4]") 
val ssc = new StreamingContext(sparkConf, Seconds(5)) 

case class MySens(elem: String, k: Int, pl: String) 

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "group.id" -> "test_luca", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val topics1 = Array("fast-messages") 

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)) 

val s1 = stream.map(record => { 
    implicit val formats = DefaultFormats 
    parse(record.value).extract[MySens] 
} 
) 

val p1 = s1.filter {e => e.k.%(10)==0} 

p1.print() 

ssc.start() 
ssc.awaitTermination() 
} 

}