0
我有下面的代码:如何解决AttributeError:使用UDF时,'RDD'对象没有属性'_get_object_id'?
from pyspark.sql.functions import lit
from pyspark.sql.functions import UserDefinedFunction
def aa(a, b):
if (a == 1):
return 3
else:
return 6
example_dataframe = sqlContext.createDataFrame([(1, 1), (2, 2)], ['a', 'b'])
example_dataframe.show()
af = UserDefinedFunction(lambda (line_a, line_b): aa(line_a, line_b), StringType())
a = af(example_dataframe.rdd)
print(a)
example_dataframe.withColumn('c',lit(a))
example_dataframe.show()
我想基于在其它属性条件的新列。我知道可以使用“withColumn”子句指定条件,但我想尝试使用UDF。
我得到了一个错误:
Traceback (most recent call last):
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 349, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 337, in <module>
exec(code)
File "<stdin>", line 9, in <module>
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/functions.py", line 1848, in __call__
jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 59, in _to_seq
cols = [converter(c) for c in cols]
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 47, in _to_java_column
jcol = _create_column_from_name(col)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 40, in _create_column_from_name
return sc._jvm.functions.col(name)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__
args_command, temp_args = self._build_args(*args)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RDD' object has no attribute '_get_object_id'
我怎么能传递一个UDF的属性值?
谢谢!很好的答案! – jartymcfly