2017-08-03 14 views
1

我有2个dataframes,我想找到除2等于所有列(surrogate_key,电流)的记录火花保存服用大量的时间

然后,我要保存新surrogate_key值的记录。

以下是我的代码:

val seq = csvDataFrame.columns.toSeq 
var exceptDF = csvDataFrame.except(csvDataFrame.as('a).join(table.as('b),seq).drop("surrogate_key","current")) 
exceptDF.show() 

exceptDF = exceptDF.withColumn("surrogate_key", makeSurrogate(csvDataFrame("name"), lit("ecc"))) 
exceptDF = exceptDF.withColumn("current", lit("Y")) 

exceptDF.show() 

exceptDF.write.option("driver","org.postgresql.Driver").mode(SaveMode.Append).jdbc(postgreSQLProp.getProperty("url"), tableName, postgreSQLProp) 

该代码给出正确的结果,但在写这些结果postgre卡住。

不知道是什么问题。还有没有更好的方法呢?

问候, Sorabh

+0

还显示()写入正确postgre打印数据帧,但后来在写之前需要太多的时间。 –

+1

'show'不会对整个数据执行转换,只需要显示多少数据(默认为20)。在写入postgres之前,您需要执行缓存+操作,然后您可以测量写入postgres的实际时间。 “很多时间”是一个非常广泛的描述,你有多少数据以及它实际上需要多少时间... – eliasah

+0

嗨Eliasah,我用过cache()+ count(),它需要大约1/2小时只有3行10列,延迟也不是由于写入Postgre。在添加count()之后,它的count()现在需要时间。 –

回答

1

默认情况下,spark-sql会创建200个分区,这意味着当您尝试保存datafrmae时,它将被保存在200个parquet文件中。您可以使用以下技术减少Dataframe的分区数量。

  1. 在应用程序级别。设置参数 “spark.sql.shuffle.partitions” 如下:

    sqlContext.setConf( “spark.sql.shuffle.partitions”, “10”)

  2. 减少分区数,该特定据帧如下:

    df.coalesce(10).write.save(...)

希望它帮助。

问候,

Neeraj

+0

Thanks Neeraj,That helps .. :) –

0

使用var对数据帧不建议的,你应该总是使用val和数据帧进行一番改造后,创建一个新的数据帧。

请删除所有的var并替换为val

希望这会有所帮助!

+0

谢谢,尚卡尔。即使将var更改为val,问题仍然存在。 –