2017-09-07 65 views
0

我有一个循环,它在每次迭代中生成行。我的目标是创建一个具有给定模式的数据框,该数据框仅包含那些行。我心里有一组要遵循的步骤,但我不能够在新在每次循环迭代Spark - 从循环中生成的行列表创建数据框

我尝试以下方法添加到List[Row]

var listOfRows = List[Row]() 

val dfToExtractValues: DataFrame = ??? 

dfToExtractValues.foreach { x => 

    //Not really important how to generate here the variables 
    //So to simplify all the rows will have the same values 

    var col1 = "firstCol" 
    var col2 = "secondCol" 
    var col3 = "thirdCol" 

    val newRow = RowFactory.create(col1,col2,col3) 

    //This step I am not able to do 
    //listOfRows += newRow  -> Just for strings 
    //listOfRows.add(newRow)  -> This add doesnt exist, it is a addString 
    //listOfRows.aggregate(1)(newRow)  -> This is not how aggreage works... 
} 


val rdd = sc.makeRDD[RDD](listOfRows) 

val dfWithNewRows = sqlContext.createDataFrame(rdd, myOriginalDF.schema) 

有人能告诉我我做错了什么,或者我在改变生成一个数据框的方法时会改变什么?

也许有更好的方法来收集行而不是List [Row]。但是,我需要将其他类型的集合转换为数据框。

回答

1

有人能告诉我什么我做错了

瓶盖

首先它看起来像你的编程指南中跳过Understanding Closures。任何尝试修改通过闭包传递的变量都是徒劳的。您所能做的只是修改副本,并且更改不会全局反映出来。

变量不使对象变更:

var listOfRows = List[Row]() 

创建一个变量。分配List是一样不变的。如果在Spark背景是不是你可以创建一个新的List并重新分配:

listOfRows = newRow :: listOfRows 

请注意,我们不perpend追加 - 你不想追加到循环列表。

当您想共享数据(例如Akka中的常见模式)时,带有不可变对象的变量很有用,但在Spark中没有多少应用程序。

让事情分布:

最后从不取数据的驱动器只是为了再次分发。您还应该避免在RDDsDataFrames之间进行不必要的转换。最好是使用DataFrame运营商一路:

dfToExtractValues.select(...) 

但如果你需要更复杂的东西map

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

dfToExtractValues.map(x => ...)(RowEncoder(schema))