2017-08-03 120 views
0

我有一些昂贵的分析需要在对象的DataFrame上执行。设置看起来像这样。为什么Spark SQL UDF比RDD慢?

# This does the expensive work and holds some reference data 
# Expensive to initialize so done only once 
analyze = Analyze() 

def analyze_row(row): 
    # Turn the row into objects and pass them to the function above 
    foo = Foo.from_dict(row.foo.asDict(recursive=True)) 
    bar = Bar.from_dict(row.bar.asDict(recursive=True)) 
    return analyze(foo, bar) 

当我申请analyze_row作为UDF像这样

analyze_row_udf = udf(analyze_row, result_schema) 
results_df = input_df.withColumn("result", analyze_row_udf).select("result.*") 

它是凭经验比它像这样

results = content.rdd.map(analyze_row) 
results_df = spark.createDataFrame(results, schema=result_schema) 

所有其他条件相同,则UDF施加到RDD慢版本似乎没有在一个小时内取得进展,而RDD版本在30分钟内完成。在这两种情况下,群集CPU均已最大化。同样的行为在多次尝试中被转载。

我认为DataFrames旨在取代RDD,部分原因是因为性能更好。在这种情况下RDD似乎快得多?

回答

1

DataFrames可以取代其中RDDS:(可应用于这里无)

  • 有执行计划优化。
  • 有低级别的优化使用 - 断堆内存,
  • 优化柱状存储用于码生成(再一次,当你外JVM执行黑盒代码没有被施加) - (同上)。

此外,在上下文之间传递数据很昂贵,合并部分结果需要额外的操作。它也超过了内存需求的两倍。

很难说为什么RDD在你的情况下严格更快(有显着的改进时间,而你没有提供版本),但我猜你碰到了一些案例边界案例。

总的来说,对于任意Python代码DataFrames根本不是更好的选择。对于使用箭头支持的矢量化操作,将来可能会有所改变。