2017-02-08 18 views
0

为了能够使用我的DataFrame的列名而不转义.我需要一个函数来“验证”所有的列名 - 但我尝试的方法都没有在及时(我5分钟后中止)。Scala Spark:性能问题重命名大量列

我正在尝试我的算法的数据集是golub数据集(获取它here)。这是一个具有7200列的2.2MB CSV文件。重命名所有列应该是秒

代码读取CSV在

var dfGolub = spark.read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv("golub_merged.csv") 
    .drop("_c0") // drop the first column 
    .repartition(numOfCores) 

尝试重命名列的问题:

def validifyColumnnames1(df : DataFrame) : DataFrame = { 
    import org.apache.spark.sql.functions.col 
    val cols = df.columns 
    val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.",""))) 
    df.select(colsRenamed : _*) 
} 


def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = { 
    val newColumnNames = ArrayBuffer[String]() 
    for(oldCol <- df.columns) { 
     newColumnNames += oldCol.replaceAll("\\.","") 
    } 
    df.toDF(newColumnNames : _*) 
} 

def validifyColumnnames3(df : DataFrame) : DataFrame = { 
    var newDf = df 
    for(col <- df.columns){ 
     newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.","")) 
    } 
    newDf 
} 

任何想法是什么原因造成这种性能问题?

设置:我在Ubuntu 16.04中local[24]模式的机器上运行星火2.1.0与16cores * 2个线程的RAM

+6

读取没有列名称的数据为RDD,然后只读取作为架构的列名称。结合架构和RDD来获得你的DF。 – toofrellik

回答

2

96GB和假设你知道的类型,你可以简单地创建模式,而不是infering的它(推断模式成本的性能,甚至可能是错误的csv)。

让我们假设为简单起见,你有文件example.csv如下:事先可以

val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType))) 
val df = spark.read.option("header","true").schema(scehma).csv("example.csv") 
df.show() 

+---+---+---+ 
|A_B|A_C| AD| 
+---+---+---+ 
| a| 3| 1| 
+---+---+---+ 

如果你不知道的信息:

A.B, A.C, A.D 
a,3,1 

你可以做这样的事情如前所述使用推理架构,那么您可以使用数据框生成架构:

val fields = for { 
    x <- df.schema 
} yield StructField(x.name.replaceAll("\\.",""), x.dataType, x.nullable) 
val schema = StructType(fields) 

并重读使用该架构的数据帧,如前所述