2016-12-05 20 views
2

使用Spark我正在读取csv并希望将函数应用于csv上的列。我有一些可行的代码,但它非常黑客。什么是正确的方法来做到这一点?将函数应用于Spark中的csv的单个列

我的代码

SparkContext().addPyFile("myfile.py") 
spark = SparkSession\ 
    .builder\ 
    .appName("myApp")\ 
    .getOrCreate() 
from myfile import myFunction 

df = spark.read.csv(sys.argv[1], header=True, 
    mode="DROPMALFORMED",) 
a = df.rdd.map(lambda line: Row(id=line[0], user_id=line[1], message_id=line[2], message=myFunction(line[3]))).toDF() 

我希望能够只需拨打列名称的功能,而不是每一行映射到line然后调用line[index]功能。

我使用的Spark版本2.0.1

回答

7

您可以简单地使用与withColumn结合用户定义函数(udf):

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

udf_myFunction = udf(myFunction, IntegerType()) # if the function returns an int 
df.withColumn("message", udf_myFunction("_3")) #"_3" being the column name of the column you want to consider 

这将新列添加到包含数据框dfmyFunction(line[3])的结果。

+0

太好了,谢谢,不知道有'udf'存在。超级有用。 – Sal

相关问题