2017-06-06 64 views
3

我正在使用Apache Spark 2.0数据框/数据集API 我想从值列表向我的数据框添加一个新列。我的列表与给定的数据帧具有相同数量的值。Apache Spark如何将新列从列表/数组添加到Spark数据框

val list = List(4,5,10,7,2) 
val df = List("a","b","c","d","e").toDF("row1") 

我想这样做:

val appendedDF = df.withColumn("row2",somefunc(list)) 
df.show() 
// +----+------+ 
// |row1 |row2 | 
// +----+------+ 
// |a |4 | 
// |b |5 | 
// |c |10 | 
// |d |7 | 
// |e |2 | 
// +----+------+ 

对于任何想法,我将不胜感激,我在现实中数据帧中包含多个列。作为DataFrame表明,这是一个小的数据帧的事实,(它存在于驾驶员记忆)输入list具有相同的大小:

+0

如果列表和DF大小不同,会发生什么?仅包含较大集合中的前N个项目(其中N =较短集合的大小)? –

+0

在我的情况下,我知道它将永远是相同的长度 –

+0

您也可以将列表转换为数据帧。然后将row_number添加到它们并按row_number进行连接。 –

回答

5

你可以做这样的:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._  

// create rdd from the list 
val rdd = sc.parallelize(List(4,5,10,7,2)) 
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28 

// zip the data frame with rdd 
val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2))) 
// rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32 

// create a new data frame from the rdd_new with modified schema 
spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show 
+----+-------+ 
|row1|new_col| 
+----+-------+ 
| a|  4| 
| b|  5| 
| c|  10| 
| d|  7| 
| e|  2| 
+----+-------+ 
4

添加的完整性首先 - 所以你可能会考虑collect() -ing它与list荏苒,如果需要转换回一个DataFrame

df.collect() 
    .map(_.getAs[String]("row1")) 
    .zip(list).toList 
    .toDF("row1", "row2") 

这不会是快,但如果数据非常小,可能可以忽略不计,代码(可以说)更清晰。

+1

我真的很喜欢这个答案,我认为对于小数据集来说它是完全可行的 –

相关问题