2016-12-15 101 views
1

选项我有一个数据集是这样的:如何使用星火UDF

+----+------+ 
|code|status| 
+-----------+ 
| 1| "new"| 
| 2| null| 
| 3| null| 
+----+------+ 

我想编写依赖于两列的UDF。

我得到它的工作按照this answer第二种方法是处理null的UDF之外,写myFn采取布尔作为第二个参数:

df.withColumn("new_column", 
    when(pst_regs("status").isNull, 
    myFnUdf($"code", lit(false)) 
) 
    .otherwise(
    myFnUdf($"code", lit(true)) 
) 
) 

要在UDF处理空的方法我看着是this answer,谈论“用Options包装参数”。我想这样的代码:

df.withColumn("new_column", myFnUdf($"code", $"status")) 

def myFn(code: Int, status: String) = (code, Option(status)) match { 
    case (1, "new") => "1_with_new_status" 
    case (2, Some(_)) => "2_with_any_status" 
    case (3, None) => "3_no_status" 
} 

但随着null一个行给出type mismatch; found :None.type required String。我也尝试在udf创建期间用Option包装参数而没有成功。这个(没有选项)的基本形式如下:

myFnUdf = udf[String, Int, String](myFn(_:Int, _:String)) 

我是新来的Scala,所以我敢肯定,我失去了一些东西简单。我的一些混淆可能是从功能创建udfs的不同语法(例如,根据https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-udfs.html),所以我不确定我是否使用了最好的方法。任何帮助感谢!

编辑

编辑补充缺少的每@ user6910411和@sgvd评论(1, "new")情况。

回答

1

首先,可能有一些您正在使用的代码,我们在这里丢失。当我尝试您的示例myFn,与val myFnUdf = udf(myFn _)一起制作为UDF并使用df.withColumn("new_column", myFnUdf($"code", $"status")).show运行时,我没有发现类型不匹配,而是输入MatchError,这同样也是user6910411指出的。这是因为没有模式匹配(1, "new")

除此之外,虽然通常使用Scala的选项比使用原始值更好,但在这种情况下您不必这样做。下面的示例适用于null直接:

val my_udf = udf((code: Int, status: String) => status match { 
    case null => "no status" 
    case _ => "with status" 
}) 

df.withColumn("new_column", my_udf($"code", $"status")).show 

结果:

+----+------+-----------+ 
|code|status| new_column| 
+----+------+-----------+ 
| 1| new|with status| 
| 2| null| no status| 
| 2| null| no status| 
+----+------+-----------+ 

包装与选项后仍然工作,虽然:

val my_udf = udf((code: Int, status: String) => Option(status) match { 
    case None => "no status" 
    case Some(_) => "with status" 
}) 

这给了相同的结果。

+0

谢谢@sgvd。我使用这两种方法(并更新了问题以包含丢失的案例)。感谢你的帮助。 –