2016-12-19 110 views
1

我需要在Spark中使用接受时间戳,整数和另一个数据框的UDF并返回3个值的元组。在Spark Scala中定义UDF

错误发生后,我不断点击错误,我不确定我是否试图修复它。

下面是函数:

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = { 
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date) 
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1)) 
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1) 
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0)) 
    return result 
} 
val det_price_udf = udf(determine_price) 

它给我的错误是:

error: missing argument list for method determine_price 
Unapplied methods are only converted to functions when a function type is expected. 
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`. 

如果我开始加入我请其他错误,如int预期Int.type发现运行参数或object DataFrame不是包的成员org.apache.spark.sql

给出一些上下文:

这个想法是我有一个价格数据框,一个产品ID和一个创建日期,另一个数据框包含产品ID和查看日期。

我需要确定基于哪个价格是上次创建的价格条目早于查看日期的价格。

由于每个产品ID在第二个数据框中有多个查看日期。我认为UDF比交叉连接更快。如果有人有不同的想法,我会很感激。

回答

0

由于UDF将在特定分区上的工作人员上运行,因此无法传递UDF中的数据框。因为你不能在Worker上使用RDD(Is it possible to create nested RDDs in Apache Spark?),所以同样你也不能在Worker上使用DataFrame。

您需要为此做一些工作!

+0

好吧,我从de UDF参数中删除了数据框。数据帧被缓存和播放,它应该可以从函数内部访问我仍然得到错误:'错误:类型不匹配; found:Int.type required:Int val det_price_udf = udf(determine_price(org.apache.spark.sql.types.TimestampType,Int))' – UrVal

+0

看起来,如果数据框不在UDF中,它不可能是用过的。这不像我习惯于Python的“全局变量”。不知道如何解决这个问题。 – UrVal

+0

你的用例是什么? –