2017-06-05 78 views
0

我有以下UDF:无法执行用户定义的函数

val jac_index:(Array[String],Array[String])=>Float=(Sq1:Array[String],Sq2:Array[String])=> 
{ 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
} 
val jacUDF=udf(jac_index) 

,当我执行下面的句子

val movie_jac_df=movie_pairs_df.withColumn("jac",jacUDF(movie_pairs_df("name"),movie_pairs_df("name2"))) 

我得到“无法执行用户定义的函数”

错误

的架构movie_pairs_df是以下内容

root 
|-- movie: string (nullable = true) 
|-- name: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- movie2: string (nullable = true) 
|-- name2: array (nullable = true) 
| |-- element: string (containsNull = true) 

那么原因是什么?

回答

1

Spark的DataFrame模型Array列为mutable.WrappedArray,这意味着您的UDF应该将两个WrappedArrays作为其输入;

如果更改jac_index期望两个这样的数组:

import scala.collection.mutable 

val jac_index: (mutable.WrappedArray[String], mutable.WrappedArray[String]) => Float = 
    (Sq1, Sq2) => { /* same implementation */ } 

这将正常工作。

+0

它的工作原理,谢谢! – leonfrank

0

定义UDF如下

val jacUDF = udf((Sq1:mutable.WrappedArray[String], Sq2:mutable.WrappedArray[String]) => { 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
})