0
我有一些昂贵的分析需要在对象的DataFrame上执行。设置看起来像这样。为什么Spark SQL UDF比RDD慢?
# This does the expensive work and holds some reference data
# Expensive to initialize so done only once
analyze = Analyze()
def analyze_row(row):
# Turn the row into objects and pass them to the function above
foo = Foo.from_dict(row.foo.asDict(recursive=True))
bar = Bar.from_dict(row.bar.asDict(recursive=True))
return analyze(foo, bar)
当我申请analyze_row
作为UDF像这样
analyze_row_udf = udf(analyze_row, result_schema)
results_df = input_df.withColumn("result", analyze_row_udf).select("result.*")
它是凭经验比它像这样
results = content.rdd.map(analyze_row)
results_df = spark.createDataFrame(results, schema=result_schema)
所有其他条件相同,则UDF施加到RDD慢版本似乎没有在一个小时内取得进展,而RDD版本在30分钟内完成。在这两种情况下,群集CPU均已最大化。同样的行为在多次尝试中被转载。
我认为DataFrames旨在取代RDD,部分原因是因为性能更好。在这种情况下RDD似乎快得多?