2015-02-12 51 views
0

我是Spark SQL新手。 Concat函数在Spark Sql Query中不可用,因为我们已经注册了一个sql函数,在此函数中我需要访问另一个表。为此,我们在SQLContext对象上编写了spark sql查询。 当调用此查询我得到NullpointerException.please能帮上你这一点。如何在spark sql registeredFunction中使用spark SqlContext对象?

预先感谢

//此我我的代码

class SalesHistory_2(sqlContext:SQLContext,sparkContext:SparkContext) extends Serializable { 

import sqlContext._ 
import sqlContext.createSchemaRDD 
try{ 


sqlContext.registerFunction("MaterialTransformation", Material_Transformation _) 

    def Material_Transformation(Material_ID: String): String = 
{  

    var material:String =null; 
     var dd = sqlContext.sql("select * from product_master")  

material 
}  



    /* Product master*/ 
    val productRDD = this.sparkContext.textFile("D:\\Realease 8.0\\files\\BHI\\BHI_SOP_PRODUCT_MASTER.txt")   

    val product_schemaString = productRDD.first 

    val product_withoutHeaders = dropHeader(productRDD) 

    val product_schema = StructType(product_schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true))) 

    val productdata = product_withoutHeaders.map{_.replace("|", "| ")}.map(x=> x.split("\\|")) 
var product_rowRDD = productdata.map(line=>{ 
Row.fromSeq(line.map {_.trim() }) 
}) 

    val product_srctableRDD = sqlContext.applySchema(product_rowRDD, product_schema) 

    product_srctableRDD.registerTempTable("product_master") 
    cacheTable("product_master") 

    /* Customer master*/ 


    /* Sales History*/ 



    val srcRDD = this.sparkContext.textFile("D:\\Realease 8.0\\files\\BHI\\BHI_SOP_TRADE_SALES_HISTORY_DS_4_20150119.txt") 

    val schemaString= srcRDD.first 

    val withoutHeaders = dropHeader(srcRDD)  

    val schema = StructType(schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true))) 

    val lines = withoutHeaders.map {_.replace("|", "| ")}.map(x=> x.split("\\|")) 
var rowRDD = lines.map(line=>{ 
Row.fromSeq(line.map {_.trim() }) 
}) 

val srctableRDD = sqlContext.applySchema(rowRDD, schema) 

srctableRDD.registerTempTable("SALES_HISTORY") 



val srcResults = sqlContext.sql("SELECT Delivery_Number,Delivery_Line_Item,MaterialTransformation(Material_ID),Customer_Group_Node,Ops_ID,DC_ID,Mfg_ID,PGI_Date,Delivery_Qty,Customer_Group_Node,Line_Total_COGS,Line_Net_Rev,Material_Description,Sold_To_Partner_Name,Plant_Description,Originating_Doc,Orig_Doc_Line_item,Revenue_Type,Material_Doc_Ref,Mater_Doc_Ref_Item,Req_Delivery_Date FROM SALES_HISTORY") 

val path: Path = Path ("D:/Realease 8.0/files/output/") 
try { 
    path.deleteRecursively(continueOnFailure = false) 
} catch { 
    case e: IOException => // some file could not be deleted 
} 

    val successRDDToFile = srcResults.map { x => x.mkString("|")} 
    successRDDToFile.coalesce(1).saveAsTextFile("D:/Realease 8.0/files/output/")  

    } 
    catch { 
    case ex: Exception => println(ex) // TODO: handle error 
    } 

this.sparkContext.stop()  

DEF dropHeader(数据:RDD [字符串]):RDD [字符串] = {

data.mapPartitionsWithIndex((idx, lines) => { 
    if (idx == 0) { 
    lines.drop(1) 
    } 
    lines 
}) 

}

+0

什么码你尝试过这么远吗? – gasparms 2015-02-12 07:59:11

+0

嗨,我贴我在上面的代码,请查一下,帮我 – laxman 2015-02-12 08:50:35

回答

0

在销售历史RDD每一行调用SQL星火看起来像一个非常糟糕的主意:

val srcResults = sqlContext.sql("SELECT Delivery_Number,Delivery_Line_Item,MaterialTransformation(Material_ID),Customer_Group_Node,Ops_ID,DC_ID,Mfg_ID,PGI_Date,Delivery_Qty,Customer_Group_Node,Line_Total_COGS,Line_Net_Rev,Material_Description,Sold_To_Partner_Name,Plant_Description,Originating_Doc,Orig_Doc_Line_item,Revenue_Type,Material_Doc_Ref,Mater_Doc_Ref_Item,Req_Delivery_Date FROM SALES_HISTORY") 

你最好的RDDS之间更好的用户加入,忘了你的自定义功能:

val srcResults = sqlContext.sql("SELECT s.*, p.* FROM SALES_HISTORY s join product_master p on s.Material_ID=p.ID") 
+0

此外,您使用解析您的_Pipe Separated_文件中的代码看起来非常复杂。看看[Spark CSV](https://github.com/databricks/spark-csv),你可以改变列分隔符。 – 2015-02-12 10:48:38

+0

感谢您的建议 – laxman 2015-02-12 16:48:38

+0

感谢您的建议。从表中获取数据之前,我需要做一些data.like如果操作材质ID在这两个表,而不是材质ID我需要Material_Transformation从其他表像product_loop相同 – laxman 2015-02-12 16:57:38

1

这里的答案很短,可能令人失望 - 你根本无法做这样的事情。

在星火一般规律是,你不能从另一个动作和转换触发动作或转换,或者更精确一点,外面的司机星火上下文无法再访问/定义。

相关问题