2015-07-10 70 views
10

我正在使用Spark 1.3.0和Python。我有一个数据框,我希望添加一个从其他列派生的额外列。与此类似,在从其他列派生的数据框中添加新列(Spark)

>>old_df.columns 
[col_1, col_2, ..., col_m] 

>>new_df.columns 
[col_1, col_2, ..., col_m, col_n] 

其中

col_n = col_3 - col_4 

如何在PySpark做到这一点?为实现这一

回答

17

一种方法是使用withColumn方法:

old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2')) 

new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2) 

或者您可以使用SQL上的注册表格:

old_df.registerTempTable('old_df') 
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df') 
+0

嘿@ zero323,如果我想创建一个列,比如Col_1是字符串,col_2是字符串,我希望column_n为col_1和Col_2的连接。即Col_1是零并且column_2是323,Column_n应该是零3232? – Jason

+0

@Jason http://stackoverflow.com/a/31452109/1560062 – zero323

+0

谢谢@ zero323。虽然我有这个问题: df.select(concat(col(“k”),lit(“”),col(“v”)))如何在这里创建第三列? – Jason

3

此外,我们可以使用UDF

from pyspark.sql.functions import udf,col 
from pyspark.sql.types import IntegerType 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 

sc = SparkContext() 
sqlContext = SQLContext(sc) 
old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2')) 
function = udf(lambda col1, col2 : col1-col2, IntegerType()) 
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2'))) 
new_df.show() 
相关问题