0

我必须编写一个复杂的UDF,其中我必须使用不同的表进行连接,并返回匹配数。实际使用情况要复杂得多,但我已将这种情况简化为最小可重现代码。这是UDF代码。Spark:加入UDF或map函数

def predict_id(date,zip): 
    filtered_ids = contest_savm.where((F.col('postal_code')==zip) & (F.col('start_date')>=date)) 
    return filtered_ids.count() 

当我定义使用UDF下面的代码,我得到的控制台错误的一个长长的清单:

predict_id_udf = F.udf(predict_id,types.IntegerType()) 

错误的最后一行是:

py4j.Py4JException: Method __getnewargs__([]) does not exist 

我想要知道什么是最好的方式去做。我也试过map这样的:

result_rdd = df.select("party_id").rdd\ 
    .map(lambda x: predict_id(x[0],x[1]))\ 
    .distinct() 

这也导致了类似的最终误差。我想知道,如果有的话,我可以在UDF或map函数中对原始数据框的每一行进行连接。

回答

0

我必须编写一个复杂的UDF,其中我必须使用不同的表进行连接,并返回匹配数。

这是不可能的设计。我想要实现这样的效果,您必须使用高级DF/RDD运算符:

df.join(ontest_savm, 
    (F.col('postal_code')==df["zip"]) & (F.col('start_date') >= df["date"]) 
).groupBy(*df.columns).count()