2017-07-18 32 views
0

我有下面的代码:如何解决AttributeError:使用UDF时,'RDD'对象没有属性'_get_object_id'?

from pyspark.sql.functions import lit 
from pyspark.sql.functions import UserDefinedFunction 

def aa(a, b): 
    if (a == 1): 
     return 3 
    else: 
     return 6 

example_dataframe = sqlContext.createDataFrame([(1, 1), (2, 2)], ['a', 'b']) 
example_dataframe.show() 
af = UserDefinedFunction(lambda (line_a, line_b): aa(line_a, line_b), StringType()) 
a = af(example_dataframe.rdd) 
print(a) 
example_dataframe.withColumn('c',lit(a)) 
example_dataframe.show() 

我想基于在其它属性条件的新列。我知道可以使用“withColumn”子句指定条件,但我想尝试使用UDF。

我得到了一个错误:

Traceback (most recent call last): 
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 349, in <module> 
raise Exception(traceback.format_exc()) 
Exception: Traceback (most recent call last): 
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 337, in <module> 
exec(code) 
File "<stdin>", line 9, in <module> 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/functions.py", line 1848, in __call__ 
jc = self._judf.apply(_to_seq(sc, cols, _to_java_column)) 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 59, in _to_seq 
cols = [converter(c) for c in cols] 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 47, in _to_java_column 
jcol = _create_column_from_name(col) 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 40, in _create_column_from_name 
return sc._jvm.functions.col(name) 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__ 
args_command, temp_args = self._build_args(*args) 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args 
[get_command_part(arg, self.pool) for arg in new_args]) 
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part 
command_part = REFERENCE_TYPE + parameter._get_object_id() 
AttributeError: 'RDD' object has no attribute '_get_object_id' 

我怎么能传递一个UDF的属性值?

回答

1

您必须传递数据帧列而不是数据帧本身。

>>> from pyspark.sql.types import * 
>>> example_dataframe.show() 
+---+---+ 
| a| b| 
+---+---+ 
| 1| 1| 
| 2| 2| 
+---+---+ 
>>> af = UserDefinedFunction(lambda line_a, line_b : aa(line_a, line_b), StringType()) 
>>>example_dataframe.withColumn('c',af(example_dataframe['a'],example_dataframe['b'])).show() 

+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 1| 3| 
| 2| 2| 6| 
+---+---+---+ 
+0

谢谢!很好的答案! – jartymcfly

相关问题