2016-06-28 178 views
0

我正在使用nerdammer hbase spark连接器并读取两个hbase表,因为RDD会将它们转换为数据框并运行SQL以加入它的预期工作。Hbase Spark RDD JSON列

其中一个表中的一列中有JSON对象我需要在最终结果中提取特定的JSON属性值这怎么可能。 如果我在ARDD的D列中有Json数据,例如[{“foo”:“bar”,“baz”:“qux”}],我需要创建新的RDD或DF,其值仅为“baz”列,以便最终当我连接时,我只获得这个属性的值。

val ARDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](ATableName) 
     .select("A","B","C","D","E").inColumnFamily("pCF") 

     val BRDD = sc.hbaseTable[(Option[String],Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](BTableName) 
     .select("A","B","C","D","E","F").inColumnFamily("tCF") 


    val ADF = sqlContext.createDataFrame(ARDD).registerTempTable("aDF") 
    val BDF = sqlContext.createDataFrame(BRDD).registerTempTable("bDF") 

val resultset = sqlContext.sql("SELECT aDF._1, bDF._2, bDF._3, bDF._4, bDF._5, bDF._6, bDF._3, aDF._1, aDF._2, bDF._1 FROM aDF, bDFWHERE aDF._5 = bDF._7").collect() 

val joinedResult = resultset.foreach(println) 
    println("Count " + joinedResult) 

回答

0

创建一个UDF来实现这一目标,在我的DF拥有该分析信息创建新的列

import org.json4s.jackson.JsonMethods._ 
import org.apache.spark.sql.functions.udf 
def udfScoreToCategory=udf((t: String) => { 
    compact((parse(t.toString,true) \ "myField"))}) 


val abc= myDF.withColumn("_p", udfScoreToCategory(myDF("_4"))).registerTempTable("odDF")