2017-02-08 74 views
0

我有几个UDF,我想随同数据框一起作为函数参数传递。Spark UDF作为函数参数,UDF不在函数范围内

这样做的一种方式可能是在函数中创建UDF,但是这会创建并销毁UDF的多个实例而不重用它,这可能不是解决此问题的最佳方法。

这里的代码样片 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 

val df = inputDF1 
    .withColumn("new_col", lkpUDF(col("c1"))) 
val df2 = inputDF2. 
    .withColumn("new_col", lkpUDF(col("c1"))) 

而不是做上述情况,我非常想要做这样的事情 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = { 

    df 
     .withColumn("new_col", lkpUDF(col("c1"))) 

    } 
val df = appendCols(inputDF, lkpUDF) 

以上UDF是非常简单的,但在我的情况下,它可以返回一个原始类型或用户定义的案例类类型。任何想法/指针将不胜感激。谢谢。

回答

3

您与适当的签名功能需要是这样的:

import org.apache.spark.sql.UserDefinedFunction 

def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = { 
    df.withColumn("new_col", func(col("col1"))) 
} 

斯卡拉REPL是在返回初始化值的类型非常有用。

scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType)) 

此外,如果你通入udf包装函数的签名由一个Any返回类型(这将是的情况下,如果该函数可以返回一个原语或用户定义的情况下的类), UDF将无法编译,例如:

java.lang.UnsupportedOperationException: Schema for type Any is not supported 
+0

谢谢septra。你是对的,我遇到了上述错误。但即使在我仅尝试返回一个案例类时,我似乎也遇到了这个错误。 http://stackoverflow.com/questions/42121649/schema-for-type-any-is-not-supported – Yash