2016-08-02 38 views
0

我们有一个流数据,我在HBase表中有一些主信息。对于每一行我都需要查找HBase主表并获取一些配置文件信息。我的代码是这样的foreach中的Spark Streaming过滤条件--NullPointerException

val con    = new setContext(hadoopHome,sparkMaster) 
val l_sparkcontext = con.getSparkContext 
val l_hivecontext = con.getHiveContext 

val topicname  = "events" 
val ssc    = new StreamingContext(l_sparkcontext, Seconds(30)) 
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10)) 
println("Kafka Stream for receiving Events..") 

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile") 
profile_data.foreach(println) 
val tabBC = l_sparkcontext.broadcast(profile_data) 

eventsStream.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val subs_profile_rows = tabBC.value 
    val Rows = record._2.split(rowDelim) 
    Rows.foreach(row => { 
     val values = row.split(colDelim) 
     val riid = values(1).toInt 
     val cond = "riid = " + riid 
     println("Condition : ", cond) 
     val enriched_events = subs_profile_rows.filter(cond) 
    }) // End of Rows 
    }) // End of RDD 
}) // End of Events Stream 

不幸的是,我总是打在过滤器上的NPE。我在这里接受了几个问题和答案,以跨工作节点广播价值观,但没有任何帮助。有人可以帮忙吗?

问候

巴拉

+0

检查您是否使用无法序列化的值。 – cchantep

+0

我不知道是否应在foreach内创建profile_data,这是不可序列化的。 –

回答

0

您的上下文用法看起来有点腥......对我来说,它看起来像你创建两个独立的环境中(一个火花,一个火花流),然后试图在这些上下文之间共享一个广播变量(这不起作用)。

我们有一些我们周围写的代码是相似的。这些视频展示了我们如何在Splice Machine(开源)中做到这一点,以防您感兴趣。我会尝试查找代码或让其他人为您发布代码。

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

好运。

+0

谢谢约翰。我会看看视频。要求是从HBase表中读取配置文件信息,以获取DStream的数据。我也获得了forEachPartition(将发布更改的代码作为下一条评论),但这给了我不同的错误。如果你能得到它,我将等待代码。非常感谢您的帮助 –

+0

由于篇幅限制,我将不得不将我的代码分成2篇文章。 - 开始class setContext(argHadoopHome:String,argSparkMaster:String){System.setProperty(“hadoop.home.dir”,argHadoopHome) val conf = new SparkConf()。setMaster(argSparkMaster); conf.setAppName(“Evts”); 私人VAL l_valSparkContext =新SparkContext(CONF) 私人VAL l_hiveContext =新HiveContext(l_valSparkContext) DEF getSparkContext = l_valSparkContext DEF getHiveContext = l_hiveContext DEF getconfContext = CONF } –

+0

对象receiveEvents { DEF主(参数:数组[String]):单位= { var rD =“\ r \ n” var cD =“,” var sM =“spark:// nm2:7077” var ip =“nm2:2181” var hadoopHome =“/ home/..” val con = new setContext(ip,sM) val l_sparkcontext = con.getSparkContext val topicname =“evt” val ssc = new StreamingContext(l_sparkcontext,Seconds(9)) val eventsStream \t = KafkaUtils.createStream(ssc,“nm2:2181”,“rcv”,Map(topicname.toString - > 2)) val profile_data = w_hivecontext.sql(“select gender,income,age from hb_cust_pro”) –